You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/01/04 19:23:08 UTC

[2/3] orc git commit: ORC-58. Split curent Reader into RowReader and Reader (Deepak Majeti via omalley)

http://git-wip-us.apache.org/repos/asf/orc/blob/6bf63bb1/c++/src/Reader.cc
----------------------------------------------------------------------
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 91f4ea1..3c618b2 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -16,21 +16,16 @@
  * limitations under the License.
  */
 
-#include "orc/Int128.hh"
-#include "orc/OrcFile.hh"
-#include "orc/Reader.hh"
 
-#include "Adaptor.hh"
-#include "ColumnReader.hh"
-#include "Exceptions.hh"
-#include "RLE.hh"
-#include "TypeImpl.hh"
+#include "Options.hh"
+#include "Reader.hh"
+#include "Statistics.hh"
+#include "StripeStream.hh"
 
 #include "wrap/coded-stream-wrapper.h"
 
 #include <algorithm>
 #include <iostream>
-#include <limits>
 #include <memory>
 #include <sstream>
 #include <string>
@@ -80,1213 +75,160 @@ namespace orc {
     return buffer.str();
   }
 
-  enum ColumnSelection {
-    ColumnSelection_NONE = 0,
-    ColumnSelection_NAMES = 1,
-    ColumnSelection_FIELD_IDS = 2,
-    ColumnSelection_TYPE_IDS = 3,
-  };
-
-  struct ReaderOptionsPrivate {
-    ColumnSelection selection;
-    std::list<uint64_t> includedColumnIndexes;
-    std::list<std::string> includedColumnNames;
-    uint64_t dataStart;
-    uint64_t dataLength;
-    uint64_t tailLocation;
-    bool throwOnHive11DecimalOverflow;
-    int32_t forcedScaleOnHive11Decimal;
-    std::ostream* errorStream;
-    MemoryPool* memoryPool;
-    std::string serializedTail;
-
-    ReaderOptionsPrivate() {
-      selection = ColumnSelection_NONE;
-      dataStart = 0;
-      dataLength = std::numeric_limits<uint64_t>::max();
-      tailLocation = std::numeric_limits<uint64_t>::max();
-      throwOnHive11DecimalOverflow = true;
-      forcedScaleOnHive11Decimal = 6;
-      errorStream = &std::cerr;
-      memoryPool = getDefaultPool();
+  uint64_t getCompressionBlockSize(const proto::PostScript& ps) {
+    if (ps.has_compressionblocksize()) {
+      return ps.compressionblocksize();
+    } else {
+      return 256 * 1024;
     }
-  };
-
-  ReaderOptions::ReaderOptions():
-    privateBits(std::unique_ptr<ReaderOptionsPrivate>
-                (new ReaderOptionsPrivate())) {
-    // PASS
   }
 
-  ReaderOptions::ReaderOptions(const ReaderOptions& rhs):
-    privateBits(std::unique_ptr<ReaderOptionsPrivate>
-                (new ReaderOptionsPrivate(*(rhs.privateBits.get())))) {
-    // PASS
-  }
-
-  ReaderOptions::ReaderOptions(ReaderOptions& rhs) {
-    // swap privateBits with rhs
-    ReaderOptionsPrivate* l = privateBits.release();
-    privateBits.reset(rhs.privateBits.release());
-    rhs.privateBits.reset(l);
-  }
-
-  ReaderOptions& ReaderOptions::operator=(const ReaderOptions& rhs) {
-    if (this != &rhs) {
-      privateBits.reset(new ReaderOptionsPrivate(*(rhs.privateBits.get())));
+  CompressionKind convertCompressionKind(const proto::PostScript& ps) {
+    if (ps.has_compression()) {
+      return static_cast<CompressionKind>(ps.compression());
+    } else {
+      throw ParseError("Unknown compression type");
     }
-    return *this;
-  }
-
-  ReaderOptions::~ReaderOptions() {
-    // PASS
-  }
-
-  ReaderOptions& ReaderOptions::include(const std::list<uint64_t>& include) {
-    privateBits->selection = ColumnSelection_FIELD_IDS;
-    privateBits->includedColumnIndexes.assign(include.begin(), include.end());
-    privateBits->includedColumnNames.clear();
-    return *this;
-  }
-
-  ReaderOptions& ReaderOptions::include(const std::list<std::string>& include) {
-    privateBits->selection = ColumnSelection_NAMES;
-    privateBits->includedColumnNames.assign(include.begin(), include.end());
-    privateBits->includedColumnIndexes.clear();
-    return *this;
-  }
-
-  ReaderOptions& ReaderOptions::includeTypes(const std::list<uint64_t>& types) {
-    privateBits->selection = ColumnSelection_TYPE_IDS;
-    privateBits->includedColumnIndexes.assign(types.begin(), types.end());
-    privateBits->includedColumnNames.clear();
-    return *this;
-  }
-
-  ReaderOptions& ReaderOptions::range(uint64_t offset,
-                                      uint64_t length) {
-    privateBits->dataStart = offset;
-    privateBits->dataLength = length;
-    return *this;
-  }
-
-  ReaderOptions& ReaderOptions::setTailLocation(uint64_t offset) {
-    privateBits->tailLocation = offset;
-    return *this;
-  }
-
-  ReaderOptions& ReaderOptions::setMemoryPool(MemoryPool& pool) {
-    privateBits->memoryPool = &pool;
-    return *this;
-  }
-
-  ReaderOptions& ReaderOptions::setSerializedFileTail(const std::string& value
-                                                      ) {
-    privateBits->serializedTail = value;
-    return *this;
-  }
-
-  MemoryPool* ReaderOptions::getMemoryPool() const{
-    return privateBits->memoryPool;
-  }
-
-  bool ReaderOptions::getIndexesSet() const {
-    return privateBits->selection == ColumnSelection_FIELD_IDS;
-  }
-
-  bool ReaderOptions::getTypeIdsSet() const {
-    return privateBits->selection == ColumnSelection_TYPE_IDS;
-  }
-
-  const std::list<uint64_t>& ReaderOptions::getInclude() const {
-    return privateBits->includedColumnIndexes;
-  }
-
-  bool ReaderOptions::getNamesSet() const {
-    return privateBits->selection == ColumnSelection_NAMES;
-  }
-
-  const std::list<std::string>& ReaderOptions::getIncludeNames() const {
-    return privateBits->includedColumnNames;
-  }
-
-  uint64_t ReaderOptions::getOffset() const {
-    return privateBits->dataStart;
-  }
-
-  uint64_t ReaderOptions::getLength() const {
-    return privateBits->dataLength;
   }
 
-  uint64_t ReaderOptions::getTailLocation() const {
-    return privateBits->tailLocation;
-  }
-
-  ReaderOptions& ReaderOptions::throwOnHive11DecimalOverflow(bool shouldThrow){
-    privateBits->throwOnHive11DecimalOverflow = shouldThrow;
-    return *this;
-  }
-
-  bool ReaderOptions::getThrowOnHive11DecimalOverflow() const {
-    return privateBits->throwOnHive11DecimalOverflow;
-  }
-
-  ReaderOptions& ReaderOptions::forcedScaleOnHive11Decimal(int32_t forcedScale
-                                                           ) {
-    privateBits->forcedScaleOnHive11Decimal = forcedScale;
-    return *this;
-  }
-
-  int32_t ReaderOptions::getForcedScaleOnHive11Decimal() const {
-    return privateBits->forcedScaleOnHive11Decimal;
-  }
-
-  ReaderOptions& ReaderOptions::setErrorStream(std::ostream& stream) {
-    privateBits->errorStream = &stream;
-    return *this;
-  }
-
-  std::ostream* ReaderOptions::getErrorStream() const {
-    return privateBits->errorStream;
-  }
-
-  std::string ReaderOptions::getSerializedFileTail() const {
-    return privateBits->serializedTail;
-  }
-
-  StreamInformation::~StreamInformation() {
-    // PASS
-  }
-
-  StripeInformation::~StripeInformation() {
-    // PASS
-  }
-
-  class ColumnStatisticsImpl: public ColumnStatistics {
-  private:
-    uint64_t valueCount;
-
-  public:
-    ColumnStatisticsImpl(const proto::ColumnStatistics& stats);
-    virtual ~ColumnStatisticsImpl();
-
-    uint64_t getNumberOfValues() const override {
-      return valueCount;
-    }
-
-    std::string toString() const override {
-      std::ostringstream buffer;
-      buffer << "Column has " << valueCount << " values" << std::endl;
-      return buffer.str();
-    }
-  };
-
-  class BinaryColumnStatisticsImpl: public BinaryColumnStatistics {
-  private:
-    bool _hasTotalLength;
-    uint64_t valueCount;
-    uint64_t totalLength;
-
-  public:
-    BinaryColumnStatisticsImpl(const proto::ColumnStatistics& stats,
-                               bool correctStats);
-    virtual ~BinaryColumnStatisticsImpl();
-
-    bool hasTotalLength() const override {
-      return _hasTotalLength;
-    }
-    uint64_t getNumberOfValues() const override {
-      return valueCount;
-    }
-
-    uint64_t getTotalLength() const override {
-      if(_hasTotalLength){
-        return totalLength;
-      }else{
-        throw ParseError("Total length is not defined.");
-      }
-    }
-
-    std::string toString() const override {
-      std::ostringstream buffer;
-      buffer << "Data type: Binary" << std::endl
-             << "Values: " << valueCount << std::endl;
-      if(_hasTotalLength){
-        buffer << "Total length: " << totalLength << std::endl;
-      }else{
-        buffer << "Total length: not defined" << std::endl;
-      }
-      return buffer.str();
-    }
-  };
-
-  class BooleanColumnStatisticsImpl: public BooleanColumnStatistics {
-  private:
-    bool _hasCount;
-    uint64_t valueCount;
-    uint64_t trueCount;
-
-  public:
-    BooleanColumnStatisticsImpl(const proto::ColumnStatistics& stats, bool correctStats);
-    virtual ~BooleanColumnStatisticsImpl();
-
-    bool hasCount() const override {
-      return _hasCount;
-    }
-
-    uint64_t getNumberOfValues() const override {
-      return valueCount;
-    }
-
-    uint64_t getFalseCount() const override {
-      if(_hasCount){
-        return valueCount - trueCount;
-      }else{
-        throw ParseError("False count is not defined.");
-      }
-    }
-
-    uint64_t getTrueCount() const override {
-      if(_hasCount){
-        return trueCount;
-      }else{
-        throw ParseError("True count is not defined.");
-      }
-    }
-
-    std::string toString() const override {
-      std::ostringstream buffer;
-      buffer << "Data type: Boolean" << std::endl
-             << "Values: " << valueCount << std::endl;
-      if(_hasCount){
-        buffer << "(true: " << trueCount << "; false: "
-               << valueCount - trueCount << ")" << std::endl;
-      } else {
-        buffer << "(true: not defined; false: not defined)" << std::endl;
-        buffer << "True and false count are not defined" << std::endl;
-      }
-      return buffer.str();
-    }
-  };
-
-  class DateColumnStatisticsImpl: public DateColumnStatistics {
-  private:
-    bool _hasMinimum;
-    bool _hasMaximum;
-    uint64_t valueCount;
-    int32_t minimum;
-    int32_t maximum;
-
-  public:
-    DateColumnStatisticsImpl(const proto::ColumnStatistics& stats, bool correctStats);
-    virtual ~DateColumnStatisticsImpl();
-
-    bool hasMinimum() const override {
-      return _hasMinimum;
-    }
-
-    bool hasMaximum() const override {
-      return _hasMaximum;
-    }
-
-    uint64_t getNumberOfValues() const override {
-      return valueCount;
-    }
-
-    int32_t getMinimum() const override {
-      if(_hasMinimum){
-        return minimum;
-      }else{
-        throw ParseError("Minimum is not defined.");
-      }
-    }
-
-    int32_t getMaximum() const override {
-      if(_hasMaximum){
-        return maximum;
-      }else{
-        throw ParseError("Maximum is not defined.");
-      }
-    }
-
-    std::string toString() const override {
-      std::ostringstream buffer;
-      buffer << "Data type: Date" << std::endl
-             << "Values: " << valueCount << std::endl;
-      if(_hasMinimum){
-        buffer << "Minimum: " << minimum << std::endl;
-      }else{
-        buffer << "Minimum: not defined" << std::endl;
-      }
-
-      if(_hasMaximum){
-        buffer << "Maximum: " << maximum << std::endl;
-      }else{
-        buffer << "Maximum: not defined" << std::endl;
-      }
-      return buffer.str();
-    }
-  };
-
-  class DecimalColumnStatisticsImpl: public DecimalColumnStatistics {
-  private:
-    bool _hasMinimum;
-    bool _hasMaximum;
-    bool _hasSum;
-    uint64_t valueCount;
-    std::string minimum;
-    std::string maximum;
-    std::string sum;
-
-  public:
-    DecimalColumnStatisticsImpl(const proto::ColumnStatistics& stats, bool correctStats);
-    virtual ~DecimalColumnStatisticsImpl();
-
-    bool hasMinimum() const override {
-      return _hasMinimum;
-    }
-
-    bool hasMaximum() const override {
-      return _hasMaximum;
-    }
-
-    bool hasSum() const override {
-      return _hasSum;
-    }
-
-    uint64_t getNumberOfValues() const override {
-      return valueCount;
-    }
-
-    Decimal getMinimum() const override {
-      if(_hasMinimum){
-        return Decimal(minimum);
-      }else{
-        throw ParseError("Minimum is not defined.");
-      }
-    }
-
-    Decimal getMaximum() const override {
-      if(_hasMaximum){
-        return Decimal(maximum);
-      }else{
-        throw ParseError("Maximum is not defined.");
-      }
-    }
-
-    Decimal getSum() const override {
-      if(_hasSum){
-        return Decimal(sum);
-      }else{
-        throw ParseError("Sum is not defined.");
-      }
-    }
-
-    std::string toString() const override {
-      std::ostringstream buffer;
-      buffer << "Data type: Decimal" << std::endl
-          << "Values: " << valueCount << std::endl;
-      if(_hasMinimum){
-        buffer << "Minimum: " << minimum << std::endl;
-      }else{
-        buffer << "Minimum: not defined" << std::endl;
-      }
-
-      if(_hasMaximum){
-        buffer << "Maximum: " << maximum << std::endl;
-      }else{
-        buffer << "Maximum: not defined" << std::endl;
-      }
-
-      if(_hasSum){
-        buffer << "Sum: " << sum << std::endl;
-      }else{
-        buffer << "Sum: not defined" << std::endl;
-      }
-
-      return buffer.str();
-    }
-  };
-
-  class DoubleColumnStatisticsImpl: public DoubleColumnStatistics {
-  private:
-    bool _hasMinimum;
-    bool _hasMaximum;
-    bool _hasSum;
-    uint64_t valueCount;
-    double minimum;
-    double maximum;
-    double sum;
-
-  public:
-    DoubleColumnStatisticsImpl(const proto::ColumnStatistics& stats);
-    virtual ~DoubleColumnStatisticsImpl();
-
-    bool hasMinimum() const override {
-      return _hasMinimum;
-    }
-
-    bool hasMaximum() const override {
-      return _hasMaximum;
-    }
-
-    bool hasSum() const override {
-      return _hasSum;
-    }
-
-    uint64_t getNumberOfValues() const override {
-      return valueCount;
-    }
-
-    double getMinimum() const override {
-      if(_hasMinimum){
-        return minimum;
-      }else{
-        throw ParseError("Minimum is not defined.");
-      }
-    }
-
-    double getMaximum() const override {
-      if(_hasMaximum){
-        return maximum;
-      }else{
-        throw ParseError("Maximum is not defined.");
-      }
-    }
-
-    double getSum() const override {
-      if(_hasSum){
-        return sum;
-      }else{
-        throw ParseError("Sum is not defined.");
-      }
-    }
-
-    std::string toString() const override {
-      std::ostringstream buffer;
-      buffer << "Data type: Double" << std::endl
-          << "Values: " << valueCount << std::endl;
-      if(_hasMinimum){
-        buffer << "Minimum: " << minimum << std::endl;
-      }else{
-        buffer << "Minimum: not defined" << std::endl;
-      }
-
-      if(_hasMaximum){
-        buffer << "Maximum: " << maximum << std::endl;
-      }else{
-        buffer << "Maximum: not defined" << std::endl;
-      }
-
-      if(_hasSum){
-        buffer << "Sum: " << sum << std::endl;
-      }else{
-        buffer << "Sum: not defined" << std::endl;
-      }
-      return buffer.str();
-    }
-  };
-
-  class IntegerColumnStatisticsImpl: public IntegerColumnStatistics {
-  private:
-    bool _hasMinimum;
-    bool _hasMaximum;
-    bool _hasSum;
-    uint64_t valueCount;
-    int64_t minimum;
-    int64_t maximum;
-    int64_t sum;
-
-  public:
-    IntegerColumnStatisticsImpl(const proto::ColumnStatistics& stats);
-    virtual ~IntegerColumnStatisticsImpl();
-
-    bool hasMinimum() const override {
-      return _hasMinimum;
-    }
-
-    bool hasMaximum() const override {
-      return _hasMaximum;
-    }
-
-    bool hasSum() const override {
-      return _hasSum;
-    }
-
-    uint64_t getNumberOfValues() const override {
-      return valueCount;
-    }
-
-    int64_t getMinimum() const override {
-      if(_hasMinimum){
-        return minimum;
-      }else{
-        throw ParseError("Minimum is not defined.");
-      }
-    }
-
-    int64_t getMaximum() const override {
-      if(_hasMaximum){
-        return maximum;
-      }else{
-        throw ParseError("Maximum is not defined.");
-      }
-    }
-
-    int64_t getSum() const override {
-      if(_hasSum){
-        return sum;
-      }else{
-        throw ParseError("Sum is not defined.");
-      }
-    }
-
-    std::string toString() const override {
-      std::ostringstream buffer;
-      buffer << "Data type: Integer" << std::endl
-          << "Values: " << valueCount << std::endl;
-      if(_hasMinimum){
-        buffer << "Minimum: " << minimum << std::endl;
-      }else{
-        buffer << "Minimum: not defined" << std::endl;
-      }
-
-      if(_hasMaximum){
-        buffer << "Maximum: " << maximum << std::endl;
-      }else{
-        buffer << "Maximum: not defined" << std::endl;
-      }
-
-      if(_hasSum){
-        buffer << "Sum: " << sum << std::endl;
-      }else{
-        buffer << "Sum: not defined" << std::endl;
-      }
-      return buffer.str();
-    }
-  };
-
-  class StringColumnStatisticsImpl: public StringColumnStatistics {
-  private:
-    bool _hasMinimum;
-    bool _hasMaximum;
-    bool _hasTotalLength;
-    uint64_t valueCount;
-    std::string minimum;
-    std::string maximum;
-    uint64_t totalLength;
-
-  public:
-    StringColumnStatisticsImpl(const proto::ColumnStatistics& stats, bool correctStats);
-    virtual ~StringColumnStatisticsImpl();
-
-    bool hasMinimum() const override {
-      return _hasMinimum;
-    }
-
-    bool hasMaximum() const override {
-      return _hasMaximum;
-    }
-
-    bool hasTotalLength() const override {
-      return _hasTotalLength;
-    }
-
-    uint64_t getNumberOfValues() const override {
-      return valueCount;
-    }
-
-    std::string getMinimum() const override {
-      if(_hasMinimum){
-        return minimum;
-      }else{
-        throw ParseError("Minimum is not defined.");
-      }
-    }
-
-    std::string getMaximum() const override {
-      if(_hasMaximum){
-        return maximum;
-      }else{
-        throw ParseError("Maximum is not defined.");
-      }
-    }
-
-    uint64_t getTotalLength() const override {
-      if(_hasTotalLength){
-        return totalLength;
-      }else{
-        throw ParseError("Total length is not defined.");
-      }
-    }
-
-    std::string toString() const override {
-      std::ostringstream buffer;
-      buffer << "Data type: String" << std::endl
-          << "Values: " << valueCount << std::endl;
-      if(_hasMinimum){
-        buffer << "Minimum: " << minimum << std::endl;
-      }else{
-        buffer << "Minimum is not defined" << std::endl;
-      }
-
-      if(_hasMaximum){
-        buffer << "Maximum: " << maximum << std::endl;
-      }else{
-        buffer << "Maximum is not defined" << std::endl;
-      }
-
-      if(_hasTotalLength){
-        buffer << "Total length: " << totalLength << std::endl;
-      }else{
-        buffer << "Total length is not defined" << std::endl;
-      }
-      return buffer.str();
-    }
-  };
-
-  class TimestampColumnStatisticsImpl: public TimestampColumnStatistics {
-  private:
-    bool _hasMinimum;
-    bool _hasMaximum;
-    uint64_t valueCount;
-    int64_t minimum;
-    int64_t maximum;
-
-  public:
-    TimestampColumnStatisticsImpl(const proto::ColumnStatistics& stats,
-                                  bool correctStats);
-    virtual ~TimestampColumnStatisticsImpl();
-
-    bool hasMinimum() const override {
-      return _hasMinimum;
-    }
-
-    bool hasMaximum() const override {
-      return _hasMaximum;
-    }
-
-    uint64_t getNumberOfValues() const override {
-      return valueCount;
-    }
-
-    int64_t getMinimum() const override {
-      if(_hasMinimum){
-        return minimum;
-      }else{
-        throw ParseError("Minimum is not defined.");
-      }
-    }
-
-    int64_t getMaximum() const override {
-      if(_hasMaximum){
-        return maximum;
-      }else{
-        throw ParseError("Maximum is not defined.");
-      }
-    }
-
-    std::string toString() const override {
-      std::ostringstream buffer;
-      buffer << "Data type: Timestamp" << std::endl
-          << "Values: " << valueCount << std::endl;
-      if(_hasMinimum){
-        buffer << "Minimum: " << minimum << std::endl;
-      }else{
-        buffer << "Minimum is not defined" << std::endl;
-      }
-
-      if(_hasMaximum){
-        buffer << "Maximum: " << maximum << std::endl;
-      }else{
-        buffer << "Maximum is not defined" << std::endl;
+  std::string ColumnSelector::toDotColumnPath() {
+      if (columns.empty()) {
+          return std::string();
       }
-      return buffer.str();
-    }
-  };
-
-  std::string streamKindToString(StreamKind kind) {
-    switch (static_cast<int>(kind)) {
-    case StreamKind_PRESENT:
-      return "present";
-    case StreamKind_DATA:
-      return "data";
-    case StreamKind_LENGTH:
-      return "length";
-    case StreamKind_DICTIONARY_DATA:
-      return "dictionary";
-    case StreamKind_DICTIONARY_COUNT:
-      return "dictionary count";
-    case StreamKind_SECONDARY:
-      return "secondary";
-    case StreamKind_ROW_INDEX:
-      return "index";
-    case StreamKind_BLOOM_FILTER:
-      return "bloom";
-    }
-    std::stringstream buffer;
-    buffer << "unknown - " << kind;
-    return buffer.str();
-  }
-
-  std::string columnEncodingKindToString(ColumnEncodingKind kind) {
-    switch (static_cast<int>(kind)) {
-    case ColumnEncodingKind_DIRECT:
-      return "direct";
-    case ColumnEncodingKind_DICTIONARY:
-      return "dictionary";
-    case ColumnEncodingKind_DIRECT_V2:
-      return "direct rle2";
-    case ColumnEncodingKind_DICTIONARY_V2:
-      return "dictionary rle2";
-    }
-    std::stringstream buffer;
-    buffer << "unknown - " << kind;
-    return buffer.str();
-  }
-
-  class StreamInformationImpl: public StreamInformation {
-  private:
-    StreamKind kind;
-    uint64_t column;
-    uint64_t offset;
-    uint64_t length;
-  public:
-    StreamInformationImpl(uint64_t _offset,
-                          const proto::Stream& stream
-                          ): kind(static_cast<StreamKind>(stream.kind())),
-                             column(stream.column()),
-                             offset(_offset),
-                             length(stream.length()) {
-      // PASS
-    }
-
-    ~StreamInformationImpl();
-
-    StreamKind getKind() const override {
-      return kind;
-    }
-
-    uint64_t getColumnId() const override {
-      return column;
-    }
-
-    uint64_t getOffset() const override {
-      return offset;
-    }
-
-    uint64_t getLength() const override {
-      return length;
-    }
-  };
-
-  StreamInformationImpl::~StreamInformationImpl() {
-    // PASS
+      std::ostringstream columnStream;
+      std::copy(columns.begin(), columns.end(),
+              std::ostream_iterator<std::string>(columnStream, "."));
+      std::string columnPath = columnStream.str();
+      return columnPath.substr(0, columnPath.length() - 1);
   }
 
-  class StripeInformationImpl : public StripeInformation {
-    uint64_t offset;
-    uint64_t indexLength;
-    uint64_t dataLength;
-    uint64_t footerLength;
-    uint64_t numRows;
-    InputStream* stream;
-    MemoryPool& memory;
-    CompressionKind compression;
-    uint64_t blockSize;
-    mutable std::unique_ptr<proto::StripeFooter> stripeFooter;
-    void ensureStripeFooterLoaded() const;
-  public:
-
-    StripeInformationImpl(uint64_t _offset,
-                          uint64_t _indexLength,
-                          uint64_t _dataLength,
-                          uint64_t _footerLength,
-                          uint64_t _numRows,
-                          InputStream* _stream,
-                          MemoryPool& _memory,
-                          CompressionKind _compression,
-                          uint64_t _blockSize
-                          ) : offset(_offset),
-                              indexLength(_indexLength),
-                              dataLength(_dataLength),
-                              footerLength(_footerLength),
-                              numRows(_numRows),
-                              stream(_stream),
-                              memory(_memory),
-                              compression(_compression),
-                              blockSize(_blockSize) {
-      // PASS
-    }
-
-    virtual ~StripeInformationImpl() {
-      // PASS
-    }
-
-    uint64_t getOffset() const override {
-      return offset;
-    }
-
-    uint64_t getLength() const override {
-      return indexLength + dataLength + footerLength;
-    }
-    uint64_t getIndexLength() const override {
-      return indexLength;
-    }
-
-    uint64_t getDataLength()const override {
-      return dataLength;
-    }
-
-    uint64_t getFooterLength() const override {
-      return footerLength;
-    }
-
-    uint64_t getNumberOfRows() const override {
-      return numRows;
-    }
-
-    uint64_t getNumberOfStreams() const override {
-      ensureStripeFooterLoaded();
-      return static_cast<uint64_t>(stripeFooter->streams_size());
-    }
-
-    std::unique_ptr<StreamInformation> getStreamInformation(uint64_t streamId
-                                                            ) const override;
-
-    ColumnEncodingKind getColumnEncoding(uint64_t colId) const override {
-      ensureStripeFooterLoaded();
-      return static_cast<ColumnEncodingKind>(stripeFooter->
-                                             columns(static_cast<int>(colId))
-                                             .kind());
-    }
-
-    uint64_t getDictionarySize(uint64_t colId) const override {
-      ensureStripeFooterLoaded();
-      return static_cast<ColumnEncodingKind>(stripeFooter->
-                                             columns(static_cast<int>(colId))
-                                             .dictionarysize());
-    }
 
-    const std::string& getWriterTimezone() const override {
-      ensureStripeFooterLoaded();
-      return stripeFooter->writertimezone();
-    }
-  };
-
-  void StripeInformationImpl::ensureStripeFooterLoaded() const {
-    if (stripeFooter.get() == nullptr) {
-      std::unique_ptr<SeekableInputStream> pbStream =
-        createDecompressor(compression,
-                           std::unique_ptr<SeekableInputStream>
-                             (new SeekableFileInputStream(stream,
-                                                          offset +
-                                                            indexLength +
-                                                            dataLength,
-                                                          footerLength,
-                                                          memory)),
-                           blockSize,
-                           memory);
-      stripeFooter.reset(new proto::StripeFooter());
-      if (!stripeFooter->ParseFromZeroCopyStream(pbStream.get())) {
-        throw ParseError("Failed to parse the stripe footer");
+  void ColumnSelector::selectChildren(std::vector<bool>& selectedColumns, const Type& type) {
+    size_t id = static_cast<size_t>(type.getColumnId());
+    if (!selectedColumns[id]) {
+      selectedColumns[id] = true;
+      for(size_t c = id; c <= type.getMaximumColumnId(); ++c){
+        selectedColumns[c] = true;
       }
     }
   }
 
-  std::unique_ptr<StreamInformation>
-     StripeInformationImpl::getStreamInformation(uint64_t streamId) const {
-    ensureStripeFooterLoaded();
-    uint64_t streamOffset = offset;
-    for(uint64_t s=0; s < streamId; ++s) {
-      streamOffset += stripeFooter->streams(static_cast<int>(s)).length();
-    }
-    return ORC_UNIQUE_PTR<StreamInformation>
-      (new StreamInformationImpl(streamOffset,
-                                 stripeFooter->
-                                   streams(static_cast<int>(streamId))));
-  }
-
-  ColumnStatistics* convertColumnStatistics(const proto::ColumnStatistics& s,
-                                            bool correctStats) {
-    if (s.has_intstatistics()) {
-      return new IntegerColumnStatisticsImpl(s);
-    } else if (s.has_doublestatistics()) {
-      return new DoubleColumnStatisticsImpl(s);
-    } else if (s.has_stringstatistics()) {
-      return new StringColumnStatisticsImpl(s, correctStats);
-    } else if (s.has_bucketstatistics()) {
-      return new BooleanColumnStatisticsImpl(s, correctStats);
-    } else if (s.has_decimalstatistics()) {
-      return new DecimalColumnStatisticsImpl(s, correctStats);
-    } else if (s.has_timestampstatistics()) {
-      return new TimestampColumnStatisticsImpl(s, correctStats);
-    } else if (s.has_datestatistics()) {
-      return new DateColumnStatisticsImpl(s, correctStats);
-    } else if (s.has_binarystatistics()) {
-      return new BinaryColumnStatisticsImpl(s, correctStats);
-    } else {
-      return new ColumnStatisticsImpl(s);
-    }
-  }
-
-  Statistics::~Statistics() {
-    // PASS
-  }
-
-  class StatisticsImpl: public Statistics {
-  private:
-    std::list<ColumnStatistics*> colStats;
-
-    // DELIBERATELY NOT IMPLEMENTED
-    StatisticsImpl(const StatisticsImpl&);
-    StatisticsImpl& operator=(const StatisticsImpl&);
-
-  public:
-    StatisticsImpl(const proto::StripeStatistics& stripeStats, bool correctStats) {
-      for(int i = 0; i < stripeStats.colstats_size(); i++) {
-        colStats.push_back(convertColumnStatistics
-                           (stripeStats.colstats(i), correctStats));
-      }
-    }
-
-    StatisticsImpl(const proto::Footer& footer, bool correctStats) {
-      for(int i = 0; i < footer.statistics_size(); i++) {
-        colStats.push_back(convertColumnStatistics
-                           (footer.statistics(i), correctStats));
-      }
-    }
-
-    virtual const ColumnStatistics* getColumnStatistics(uint32_t columnId
-                                                        ) const override {
-      std::list<ColumnStatistics*>::const_iterator it = colStats.begin();
-      std::advance(it, static_cast<int64_t>(columnId));
-      return *it;
-    }
-
-    virtual ~StatisticsImpl();
-
-    uint32_t getNumberOfColumns() const override {
-      return static_cast<uint32_t>(colStats.size());
-    }
-  };
-
-  StatisticsImpl::~StatisticsImpl() {
-    for(std::list<ColumnStatistics*>::iterator ptr = colStats.begin();
-        ptr != colStats.end();
-        ++ptr) {
-      delete *ptr;
+  /**
+   * Recurses over a type tree and selects the parents of every selected type.
+   * @return true if any child was selected.
+   */
+  bool ColumnSelector::selectParents(std::vector<bool>& selectedColumns, const Type& type) {
+    size_t id = static_cast<size_t>(type.getColumnId());
+    bool result = selectedColumns[id];
+    for(uint64_t c=0; c < type.getSubtypeCount(); ++c) {
+      result |= selectParents(selectedColumns, *type.getSubtype(c));
     }
+    selectedColumns[id] = result;
+    return result;
   }
 
-  Reader::~Reader() {
-    // PASS
-  }
-
-  static const uint64_t DIRECTORY_SIZE_GUESS = 16 * 1024;
-
-  class ReaderImpl : public Reader {
-  private:
-    const Timezone& localTimezone;
-
-    // inputs
-    std::unique_ptr<InputStream> stream;
-    ReaderOptions options;
-    const uint64_t fileLength;
-    const uint64_t postscriptLength;
-    std::vector<bool> selectedColumns;
-
-    // custom memory pool
-    MemoryPool& memoryPool;
-
-    // postscript
-    std::unique_ptr<proto::PostScript> postscript;
-    const uint64_t blockSize;
-    const CompressionKind compression;
-
-    // footer
-    std::unique_ptr<proto::Footer> footer;
-    DataBuffer<uint64_t> firstRowOfStripe;
-    uint64_t numberOfStripes;
-    std::unique_ptr<Type> schema;
-    mutable std::unique_ptr<Type> selectedSchema;
-
-    // metadata
-    mutable std::unique_ptr<proto::Metadata> metadata;
-    mutable bool isMetadataLoaded;
-
-    // reading state
-    uint64_t previousRow;
-    uint64_t firstStripe;
-    uint64_t currentStripe;
-    uint64_t lastStripe; // the stripe AFTER the last one
-    uint64_t currentRowInStripe;
-    uint64_t rowsInCurrentStripe;
-    proto::StripeInformation currentStripeInfo;
-    proto::StripeFooter currentStripeFooter;
-    std::unique_ptr<ColumnReader> reader;
-    std::map<std::string, uint64_t> nameIdMap;
-    std::map<uint64_t, const Type*> idTypeMap;
-
-    // internal methods
-    proto::StripeFooter getStripeFooter(const proto::StripeInformation& info);
-    void startNextStripe();
-    void checkOrcVersion();
-    void readMetadata() const;
-
-    // build map from type name and id, id to Type
-    void buildTypeNameIdMap(const Type* type, std::vector<std::string>& columns);
-    std::string toDotColumnPath(const std::vector<std::string>& columns);
-
-    // Select the columns from the options object
-    void updateSelected();
-
-    // Select a field by name
-    void updateSelectedByName(const std::string& name);
-    // Select a field by id
-    void updateSelectedByFieldId(uint64_t fieldId);
-    // Select a type by id
-    void updateSelectedByTypeId(uint64_t typeId);
-
-    // Select all of the recursive children of the given type.
-    void selectChildren(const Type& type);
-
-    // For each child of type, select it if one of its children
-    // is selected.
-    bool selectParents(const Type& type);
-
-  public:
-    /**
-     * Constructor that lets the user specify additional options.
-     * @param stream the stream to read from
-     * @param options options for reading
-     * @param postscript the postscript for the file
-     * @param footer the footer for the file
-     * @param fileLength the length of the file in bytes
-     * @param postscriptLength the length of the postscript in bytes
-     */
-    ReaderImpl(std::unique_ptr<InputStream> stream,
-               const ReaderOptions& options,
-               std::unique_ptr<proto::PostScript> postscript,
-               std::unique_ptr<proto::Footer> footer,
-               uint64_t fileLength,
-               uint64_t postscriptLength);
-
-    const ReaderOptions& getReaderOptions() const;
-
-    CompressionKind getCompression() const override;
-
-    std::string getFormatVersion() const override;
-
-    WriterVersion getWriterVersion() const override;
-
-    uint64_t getNumberOfRows() const override;
-
-    uint64_t getRowIndexStride() const override;
-
-    const std::string& getStreamName() const override;
-
-    std::list<std::string> getMetadataKeys() const override;
-
-    std::string getMetadataValue(const std::string& key) const override;
-
-    bool hasMetadataValue(const std::string& key) const override;
-
-    uint64_t getCompressionSize() const override;
-
-    uint64_t getNumberOfStripes() const override;
-
-    std::unique_ptr<StripeInformation> getStripe(uint64_t
-                                                 ) const override;
-
-    uint64_t getNumberOfStripeStatistics() const override;
-
-    std::unique_ptr<Statistics>
-    getStripeStatistics(uint64_t stripeIndex) const override;
-
-
-    uint64_t getContentLength() const override;
-    uint64_t getStripeStatisticsLength() const override;
-    uint64_t getFileFooterLength() const override;
-    uint64_t getFilePostscriptLength() const override;
-    uint64_t getFileLength() const override;
-
-    std::unique_ptr<Statistics> getStatistics() const override;
-
-    std::unique_ptr<ColumnStatistics> getColumnStatistics(uint32_t columnId
-                                                          ) const override;
-
-    const Type& getType() const override;
-
-    const Type& getSelectedType() const override;
-
-    const std::vector<bool> getSelectedColumns() const override;
-
-    std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size
-                                                      ) const override;
-
-    bool next(ColumnVectorBatch& data) override;
-
-    uint64_t getRowNumber() const override;
-
-    void seekToRow(uint64_t rowNumber) override;
-
-    MemoryPool* getMemoryPool() const ;
-
-    bool hasCorrectStatistics() const override;
+  /**
+   * Recurses over a type tree and build two maps
+   * map<TypeName, TypeId>, map<TypeId, Type>
+   */
+  void ColumnSelector::buildTypeNameIdMap(const Type* type) {
+    // map<type_id, Type*>
+    idTypeMap[type->getColumnId()] = type;
 
-    std::string getSerializedFileTail() const override;
+    if (STRUCT == type->getKind()) {
+      for (size_t i = 0; i < type->getSubtypeCount(); ++i) {
+        const std::string& fieldName = type->getFieldName(i);
+        columns.push_back(fieldName);
+        nameIdMap[toDotColumnPath()] = type->getSubtype(i)->getColumnId();
+        buildTypeNameIdMap(type->getSubtype(i));
+        columns.pop_back();
+      }
+    } else {
+      // other non-primitive type
+      for (size_t j = 0; j < type->getSubtypeCount(); ++j) {
+        buildTypeNameIdMap(type->getSubtype(j));
+      }
+    }
+  }
 
-    uint64_t getMemoryUse(int stripeIx = -1) override;
-  };
+  void ColumnSelector::updateSelected(std::vector<bool>& selectedColumns, const RowReaderOptions& options) {
+    selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
+    if (contents->schema->getKind() == STRUCT && options.getIndexesSet()) {
+      for(std::list<uint64_t>::const_iterator field = options.getInclude().begin();
+          field != options.getInclude().end(); ++field) {
+        updateSelectedByFieldId(selectedColumns, *field);
+      }
+    } else if (contents->schema->getKind() == STRUCT && options.getNamesSet()) {
+      for(std::list<std::string>::const_iterator field = options.getIncludeNames().begin();
+          field != options.getIncludeNames().end(); ++field) {
+        updateSelectedByName(selectedColumns, *field);
+      }
+    } else if (options.getTypeIdsSet()) {
+      for(std::list<uint64_t>::const_iterator typeId = options.getInclude().begin();
+          typeId != options.getInclude().end(); ++typeId) {
+        updateSelectedByTypeId(selectedColumns, *typeId);
+      }
+    } else {
+      // default is to select all columns
+      std::fill(selectedColumns.begin(), selectedColumns.end(), true);
+    }
+    selectParents(selectedColumns, *contents->schema.get());
+    selectedColumns[0] = true; // column 0 is selected by default
+  }
 
-  InputStream::~InputStream() {
-    // PASS
-  };
+  void ColumnSelector::updateSelectedByFieldId(std::vector<bool>& selectedColumns, uint64_t fieldId) {
+    if (fieldId < contents->schema->getSubtypeCount()) {
+      selectChildren(selectedColumns, *contents->schema->getSubtype(fieldId));
+    } else {
+      std::stringstream buffer;
+      buffer << "Invalid column selected " << fieldId << " out of "
+             << contents->schema->getSubtypeCount();
+      throw ParseError(buffer.str());
+    }
+  }
 
-  uint64_t getCompressionBlockSize(const proto::PostScript& ps) {
-    if (ps.has_compressionblocksize()) {
-      return ps.compressionblocksize();
+  void ColumnSelector::updateSelectedByTypeId(std::vector<bool>& selectedColumns, uint64_t typeId) {
+    if (typeId < selectedColumns.size()) {
+      const Type& type = *idTypeMap[typeId];
+      selectChildren(selectedColumns, type);
     } else {
-      return 256 * 1024;
+      std::stringstream buffer;
+      buffer << "Invalid type id selected " << typeId << " out of "
+             << selectedColumns.size();
+      throw ParseError(buffer.str());
     }
   }
 
-  CompressionKind convertCompressionKind(const proto::PostScript& ps) {
-    if (ps.has_compression()) {
-      return static_cast<CompressionKind>(ps.compression());
+  void ColumnSelector::updateSelectedByName(std::vector<bool>& selectedColumns, const std::string& fieldName) {
+    std::map<std::string, uint64_t>::const_iterator ite = nameIdMap.find(fieldName);
+    if (ite != nameIdMap.end()) {
+      updateSelectedByTypeId(selectedColumns, ite->second);
     } else {
-      throw ParseError("Unknown compression type");
+      throw ParseError("Invalid column selected " + fieldName);
     }
   }
 
-  ReaderImpl::ReaderImpl(std::unique_ptr<InputStream> input,
-                         const ReaderOptions& opts,
-                         std::unique_ptr<proto::PostScript> _postscript,
-                         std::unique_ptr<proto::Footer> _footer,
-                         uint64_t _fileLength,
-                         uint64_t _postscriptLength
+  ColumnSelector::ColumnSelector(const FileContents* _contents): contents(_contents) {
+    buildTypeNameIdMap(contents->schema.get());
+  }
+
+  RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> _contents,
+                            const RowReaderOptions& opts
                          ): localTimezone(getLocalTimezone()),
-                            stream(std::move(input)),
+                            contents(_contents),
                             options(opts),
-                            fileLength(_fileLength),
-                            postscriptLength(_postscriptLength),
                             memoryPool(*opts.getMemoryPool()),
-                            postscript(std::move(_postscript)),
-                            blockSize(getCompressionBlockSize(*postscript)),
-                            compression(convertCompressionKind(*postscript)),
-                            footer(std::move(_footer)),
+                            footer(contents->footer.get()),
                             firstRowOfStripe(memoryPool, 0) {
-    isMetadataLoaded = false;
-    checkOrcVersion();
+    uint64_t numberOfStripes;
     numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
-    currentStripe = static_cast<uint64_t>(footer->stripes_size());
+    currentStripe = numberOfStripes;
     lastStripe = 0;
     currentRowInStripe = 0;
     uint64_t rowTotal = 0;
 
-    firstRowOfStripe.resize(static_cast<uint64_t>(footer->stripes_size()));
-    for(size_t i=0; i < static_cast<size_t>(footer->stripes_size()); ++i) {
+    firstRowOfStripe.resize(numberOfStripes);
+    for(size_t i=0; i < numberOfStripes; ++i) {
       firstRowOfStripe[i] = rowTotal;
       proto::StripeInformation stripeInfo =
         footer->stripes(static_cast<int>(i));
@@ -1306,48 +248,130 @@ namespace orc {
 
     if (currentStripe == 0) {
       previousRow = (std::numeric_limits<uint64_t>::max)();
-    } else if (currentStripe ==
-               static_cast<uint64_t>(footer->stripes_size())) {
+    } else if (currentStripe == numberOfStripes) {
       previousRow = footer->numberofrows();
     } else {
       previousRow = firstRowOfStripe[firstStripe]-1;
     }
 
-    schema = convertType(footer->types(0), *footer);
-    std::vector<std::string> columns;
-    buildTypeNameIdMap(schema.get(), columns);
-    updateSelected();
+    ColumnSelector column_selector(contents.get());
+    column_selector.updateSelected(selectedColumns, options);
   }
 
-  void ReaderImpl::updateSelected() {
-    selectedColumns.assign(static_cast<size_t>(footer->types_size()), false);
-    if (schema->getKind() == STRUCT && options.getIndexesSet()) {
-      for(std::list<uint64_t>::const_iterator field = options.getInclude().begin();
-          field != options.getInclude().end(); ++field) {
-        updateSelectedByFieldId(*field);
-      }
-    } else if (schema->getKind() == STRUCT && options.getNamesSet()) {
-      for(std::list<std::string>::const_iterator field = options.getIncludeNames().begin();
-          field != options.getIncludeNames().end(); ++field) {
-        updateSelectedByName(*field);
-      }
-    } else if (options.getTypeIdsSet()) {
-      for(std::list<uint64_t>::const_iterator typeId = options.getInclude().begin();
-          typeId != options.getInclude().end(); ++typeId) {
-        updateSelectedByTypeId(*typeId);
-      }
-    } else {
-      // default is to select all columns
-      std::fill(selectedColumns.begin(), selectedColumns.end(), true);
+  const RowReaderOptions& RowReaderImpl::getRowReaderOptions() const {
+    return options;
+  }
+
+  CompressionKind RowReaderImpl::getCompression() const {
+    return contents->compression;
+  }
+
+  uint64_t RowReaderImpl::getCompressionSize() const {
+    return contents->blockSize;
+  }
+
+  const std::vector<bool> RowReaderImpl::getSelectedColumns() const {
+    return selectedColumns;
+  }
+
+  const Type& RowReaderImpl::getSelectedType() const {
+    if (selectedSchema.get() == nullptr) {
+      selectedSchema = buildSelectedType(contents->schema.get(),
+                                         selectedColumns);
     }
-    selectParents(*schema);
-    selectedColumns[0] = true; // column 0 is selected by default
+    return *(selectedSchema.get());
+  }
+
+  uint64_t RowReaderImpl::getRowNumber() const {
+    return previousRow;
+  }
+
+  void RowReaderImpl::seekToRow(uint64_t rowNumber) {
+    // Empty file
+    if (lastStripe == 0) {
+      return;
+    }
+
+    // If we are reading only a portion of the file
+    // (bounded by firstStripe and lastStripe),
+    // seeking before or after the portion of interest should return no data.
+    // Implement this by setting previousRow to the number of rows in the file.
+
+    // seeking past lastStripe
+    uint64_t num_stripes = static_cast<uint64_t>(footer->stripes_size());
+    if ( (lastStripe == num_stripes
+            && rowNumber >= footer->numberofrows())  ||
+         (lastStripe < num_stripes
+            && rowNumber >= firstRowOfStripe[lastStripe])   ) {
+      currentStripe = num_stripes;
+      previousRow = footer->numberofrows();
+      return;
+    }
+
+    uint64_t seekToStripe = 0;
+    while (seekToStripe+1 < lastStripe &&
+                  firstRowOfStripe[seekToStripe+1] <= rowNumber) {
+      seekToStripe++;
+    }
+
+    // seeking before the first stripe
+    if (seekToStripe < firstStripe) {
+      currentStripe = num_stripes;
+      previousRow = footer->numberofrows();
+      return;
+    }
+
+    currentStripe = seekToStripe;
+    currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe];
+    previousRow = rowNumber;
+    startNextStripe();
+    reader->skip(currentRowInStripe);
+  }
+
+  proto::StripeFooter RowReaderImpl::getStripeFooter
+       (const proto::StripeInformation& info) {
+    uint64_t stripeFooterStart = info.offset() + info.indexlength() +
+      info.datalength();
+    uint64_t stripeFooterLength = info.footerlength();
+    std::unique_ptr<SeekableInputStream> pbStream =
+      createDecompressor(contents->compression,
+                         std::unique_ptr<SeekableInputStream>
+                         (new SeekableFileInputStream(contents->stream.get(),
+                                                      stripeFooterStart,
+                                                      stripeFooterLength,
+                                                      memoryPool)),
+                         contents->blockSize,
+                         memoryPool);
+    proto::StripeFooter result;
+    if (!result.ParseFromZeroCopyStream(pbStream.get())) {
+      throw ParseError(std::string("bad StripeFooter from ") +
+                       pbStream->getName());
+    }
+    return result;
+  }
+
+  ReaderImpl::ReaderImpl(std::shared_ptr<FileContents> _contents,
+                         const ReaderOptions& opts,
+                         uint64_t _fileLength,
+                         uint64_t _postscriptLength
+                         ): contents(std::move(_contents)),
+                            options(opts),
+                            fileLength(_fileLength),
+                            postscriptLength(_postscriptLength),
+                            memoryPool(*opts.getMemoryPool()),
+                            footer(contents->footer.get()) {
+    isMetadataLoaded = false;
+    checkOrcVersion();
+    numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
+    contents->schema = REDUNDANT_MOVE(convertType(footer->types(0), *footer));
+    contents->blockSize = getCompressionBlockSize(*contents->postscript);
+    contents->compression= convertCompressionKind(*contents->postscript);
   }
 
   std::string ReaderImpl::getSerializedFileTail() const {
     proto::FileTail tail;
     proto::PostScript *mutable_ps = tail.mutable_postscript();
-    mutable_ps->CopyFrom(*postscript);
+    mutable_ps->CopyFrom(*contents->postscript);
     proto::Footer *mutableFooter = tail.mutable_footer();
     mutableFooter->CopyFrom(*footer);
     tail.set_filelength(fileLength);
@@ -1364,11 +388,11 @@ namespace orc {
   }
 
   CompressionKind ReaderImpl::getCompression() const {
-    return compression;
+    return contents->compression;
   }
 
   uint64_t ReaderImpl::getCompressionSize() const {
-    return blockSize;
+    return contents->blockSize;
   }
 
   uint64_t ReaderImpl::getNumberOfStripes() const {
@@ -1398,19 +422,19 @@ namespace orc {
         stripeInfo.datalength(),
         stripeInfo.footerlength(),
         stripeInfo.numberofrows(),
-        stream.get(),
+        contents->stream.get(),
         memoryPool,
-        compression,
-        blockSize));
+        contents->compression,
+        contents->blockSize));
   }
 
   std::string ReaderImpl::getFormatVersion() const {
     std::stringstream result;
-    for(int i=0; i < postscript->version_size(); ++i) {
+    for(int i=0; i < contents->postscript->version_size(); ++i) {
       if (i != 0) {
         result << ".";
       }
-      result << postscript->version(i);
+      result << contents->postscript->version(i);
     }
     return result.str();
   }
@@ -1420,10 +444,10 @@ namespace orc {
   }
 
   WriterVersion ReaderImpl::getWriterVersion() const {
-    if (!postscript->has_writerversion()) {
+    if (!contents->postscript->has_writerversion()) {
       return WriterVersion_ORIGINAL;
     }
-    return static_cast<WriterVersion>(postscript->writerversion());
+    return static_cast<WriterVersion>(contents->postscript->writerversion());
   }
 
   uint64_t ReaderImpl::getContentLength() const {
@@ -1431,11 +455,11 @@ namespace orc {
   }
 
   uint64_t ReaderImpl::getStripeStatisticsLength() const {
-    return postscript->metadatalength();
+    return contents->postscript->metadatalength();
   }
 
   uint64_t ReaderImpl::getFileFooterLength() const {
-    return postscript->footerlength();
+    return contents->postscript->footerlength();
   }
 
   uint64_t ReaderImpl::getFilePostscriptLength() const {
@@ -1451,7 +475,7 @@ namespace orc {
   }
 
   const std::string& ReaderImpl::getStreamName() const {
-    return stream->getName();
+    return contents->stream->getName();
   }
 
   std::list<std::string> ReaderImpl::getMetadataKeys() const {
@@ -1480,24 +504,22 @@ namespace orc {
     return false;
   }
 
-  const std::vector<bool> ReaderImpl::getSelectedColumns() const {
-    return selectedColumns;
-  }
-
   const Type& ReaderImpl::getType() const {
-    return *(schema.get());
+    return *(contents->schema.get());
   }
 
-  const Type& ReaderImpl::getSelectedType() const {
-    if (selectedSchema.get() == nullptr) {
-      selectedSchema = buildSelectedType(schema.get(),
-                                         selectedColumns);
+  std::unique_ptr<Statistics>
+  ReaderImpl::getStripeStatistics(uint64_t stripeIndex) const {
+    if (!isMetadataLoaded) {
+      readMetadata();
     }
-    return *(selectedSchema.get());
-  }
-
-  uint64_t ReaderImpl::getRowNumber() const {
-    return previousRow;
+    if (metadata.get() == nullptr) {
+      throw std::logic_error("No stripe statistics in file");
+    }
+    return std::unique_ptr<Statistics>
+      (new StatisticsImpl(metadata->stripestats
+                          (static_cast<int>(stripeIndex)),
+                          hasCorrectStatistics()));
   }
 
   std::unique_ptr<Statistics> ReaderImpl::getStatistics() const {
@@ -1518,18 +540,18 @@ namespace orc {
   }
 
   void ReaderImpl::readMetadata() const {
-    uint64_t metadataSize = postscript->metadatalength();
+    uint64_t metadataSize = contents->postscript->metadatalength();
     uint64_t metadataStart = fileLength - metadataSize
-      - postscript->footerlength() - postscriptLength - 1;
+      - contents->postscript->footerlength() - postscriptLength - 1;
     if (metadataSize != 0) {
       std::unique_ptr<SeekableInputStream> pbStream =
-        createDecompressor(compression,
+        createDecompressor(contents->compression,
                            std::unique_ptr<SeekableInputStream>
-                             (new SeekableFileInputStream(stream.get(),
+                             (new SeekableFileInputStream(contents->stream.get(),
                                                           metadataStart,
                                                           metadataSize,
                                                           memoryPool)),
-                           blockSize,
+                           contents->blockSize,
                            memoryPool);
       metadata.reset(new proto::Metadata());
       if (!metadata->ParseFromZeroCopyStream(pbStream.get())) {
@@ -1537,125 +559,26 @@ namespace orc {
       }
     }
     isMetadataLoaded = true;
-  }
-
-  std::unique_ptr<Statistics>
-  ReaderImpl::getStripeStatistics(uint64_t stripeIndex) const {
-    if (!isMetadataLoaded) {
-      readMetadata();
-    }
-    if (metadata.get() == nullptr) {
-      throw std::logic_error("No stripe statistics in file");
-    }
-    return std::unique_ptr<Statistics>
-      (new StatisticsImpl(metadata->stripestats
-                          (static_cast<int>(stripeIndex)),
-                          hasCorrectStatistics()));
-  }
-
-
-  void ReaderImpl::seekToRow(uint64_t rowNumber) {
-    // Empty file
-    if (lastStripe == 0) {
-      return;
-    }
-
-    // If we are reading only a portion of the file
-    // (bounded by firstStripe and lastStripe),
-    // seeking before or after the portion of interest should return no data.
-    // Implement this by setting previousRow to the number of rows in the file.
-
-    // seeking past lastStripe
-    if ( (lastStripe == static_cast<uint64_t>(footer->stripes_size())
-            && rowNumber >= footer->numberofrows())  ||
-         (lastStripe < static_cast<uint64_t>(footer->stripes_size())
-            && rowNumber >= firstRowOfStripe[lastStripe])   ) {
-      currentStripe = static_cast<uint64_t>(footer->stripes_size());
-      previousRow = footer->numberofrows();
-      return;
-    }
-
-    uint64_t seekToStripe = 0;
-    while (seekToStripe+1 < lastStripe &&
-                  firstRowOfStripe[seekToStripe+1] <= rowNumber) {
-      seekToStripe++;
-    }
-
-    // seeking before the first stripe
-    if (seekToStripe < firstStripe) {
-      currentStripe = static_cast<uint64_t>(footer->stripes_size());
-      previousRow = footer->numberofrows();
-      return;
-    }
-
-    currentStripe = seekToStripe;
-    currentRowInStripe = rowNumber - firstRowOfStripe[currentStripe];
-    previousRow = rowNumber;
-    startNextStripe();
-    reader->skip(currentRowInStripe);
-  }
-
-  bool ReaderImpl::hasCorrectStatistics() const {
-    return getWriterVersion() != WriterVersion_ORIGINAL;
-  }
-
-  proto::StripeFooter ReaderImpl::getStripeFooter
-       (const proto::StripeInformation& info) {
-    uint64_t stripeFooterStart = info.offset() + info.indexlength() +
-      info.datalength();
-    uint64_t stripeFooterLength = info.footerlength();
-    std::unique_ptr<SeekableInputStream> pbStream =
-      createDecompressor(compression,
-                         std::unique_ptr<SeekableInputStream>
-                         (new SeekableFileInputStream(stream.get(),
-                                                      stripeFooterStart,
-                                                      stripeFooterLength,
-                                                      memoryPool)),
-                         blockSize,
-                         memoryPool);
-    proto::StripeFooter result;
-    if (!result.ParseFromZeroCopyStream(pbStream.get())) {
-      throw ParseError(std::string("bad StripeFooter from ") +
-                       pbStream->getName());
-    }
-    return result;
-  }
-
-  class StripeStreamsImpl: public StripeStreams {
-  private:
-    const ReaderImpl& reader;
-    const proto::StripeFooter& footer;
-    const uint64_t stripeStart;
-    InputStream& input;
-    MemoryPool& memoryPool;
-    const Timezone& writerTimezone;
-
-  public:
-    StripeStreamsImpl(const ReaderImpl& reader,
-                      const proto::StripeFooter& footer,
-                      uint64_t stripeStart,
-                      InputStream& input,
-                      MemoryPool& memoryPool,
-                      const Timezone& writerTimezone);
-
-    virtual ~StripeStreamsImpl();
-
-    virtual const ReaderOptions& getReaderOptions() const override;
-
-    virtual const std::vector<bool> getSelectedColumns() const override;
-
-    virtual proto::ColumnEncoding getEncoding(uint64_t columnId
-                                              ) const override;
+  }
 
-    virtual std::unique_ptr<SeekableInputStream>
-    getStream(uint64_t columnId,
-              proto::Stream_Kind kind,
-              bool shouldStream) const override;
+  bool ReaderImpl::hasCorrectStatistics() const {
+    return getWriterVersion() != WriterVersion_ORIGINAL;
+  }
 
-    MemoryPool& getMemoryPool() const override;
+  void ReaderImpl::checkOrcVersion() {
+    std::string version = getFormatVersion();
+    if (version != "0.11" && version != "0.12") {
+      *(options.getErrorStream())
+        << "Warning: ORC file " << contents->stream->getName()
+        << " was written in an unknown format version "
+        << version << "\n";
+    }
+  }
 
-    const Timezone& getWriterTimezone() const override;
-  };
+  std::unique_ptr<RowReader> ReaderImpl::getRowReader(
+           const RowReaderOptions& opts) const {
+    return std::unique_ptr<RowReader>(new RowReaderImpl(contents, opts));
+  }
 
   uint64_t maxStreamsForType(const proto::Type& type) {
     switch (static_cast<int64_t>(type.kind())) {
@@ -1687,6 +610,66 @@ namespace orc {
   }
 
   uint64_t ReaderImpl::getMemoryUse(int stripeIx) {
+    std::vector<bool> selectedColumns;
+    selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), true);
+    return getMemoryUse(stripeIx, selectedColumns);
+  }
+
+  uint64_t ReaderImpl::getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx) {
+    std::vector<bool> selectedColumns;
+    selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
+    ColumnSelector column_selector(contents.get());
+    if (contents->schema->getKind() == STRUCT && include.begin() != include.end()) {
+      for(std::list<uint64_t>::const_iterator field = include.begin();
+          field != include.end(); ++field) {
+        column_selector.updateSelectedByFieldId(selectedColumns, *field);
+      }
+    } else {
+      // default is to select all columns
+      std::fill(selectedColumns.begin(), selectedColumns.end(), true);
+    }
+    column_selector.selectParents(selectedColumns, *contents->schema.get());
+    selectedColumns[0] = true; // column 0 is selected by default
+    return getMemoryUse(stripeIx, selectedColumns);
+  }
+
+  uint64_t ReaderImpl::getMemoryUseByName(const std::list<std::string>& names, int stripeIx) {
+    std::vector<bool> selectedColumns;
+    selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
+    ColumnSelector column_selector(contents.get());
+    if (contents->schema->getKind() == STRUCT && names.begin() != names.end()) {
+      for(std::list<std::string>::const_iterator field = names.begin();
+          field != names.end(); ++field) {
+        column_selector.updateSelectedByName(selectedColumns, *field);
+      }
+    } else {
+      // default is to select all columns
+      std::fill(selectedColumns.begin(), selectedColumns.end(), true);
+    }
+    column_selector.selectParents(selectedColumns, *contents->schema.get());
+    selectedColumns[0] = true; // column 0 is selected by default
+    return getMemoryUse(stripeIx, selectedColumns);
+  }
+
+  uint64_t ReaderImpl::getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx) {
+    std::vector<bool> selectedColumns;
+    selectedColumns.assign(static_cast<size_t>(contents->footer->types_size()), false);
+    ColumnSelector column_selector(contents.get());
+    if (include.begin() != include.end()) {
+      for(std::list<uint64_t>::const_iterator field = include.begin();
+          field != include.end(); ++field) {
+        column_selector.updateSelectedByTypeId(selectedColumns, *field);
+      }
+    } else {
+      // default is to select all columns
+      std::fill(selectedColumns.begin(), selectedColumns.end(), true);
+    }
+    column_selector.selectParents(selectedColumns, *contents->schema.get());
+    selectedColumns[0] = true; // column 0 is selected by default
+    return getMemoryUse(stripeIx, selectedColumns);
+  }
+
+  uint64_t ReaderImpl::getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns) {
     uint64_t maxDataLength = 0;
 
     if (stripeIx >= 0 && stripeIx < footer->stripes_size()) {
@@ -1732,29 +715,29 @@ namespace orc {
      */
     uint64_t memory = hasStringColumn ? 2 * maxDataLength :
         std::min(uint64_t(maxDataLength),
-                 nSelectedStreams * stream->getNaturalReadSize());
+                 nSelectedStreams * contents->stream->getNaturalReadSize());
 
     // Do we need even more memory to read the footer or the metadata?
-    if (memory < postscript->footerlength() + DIRECTORY_SIZE_GUESS) {
-      memory =  postscript->footerlength() + DIRECTORY_SIZE_GUESS;
+    if (memory < contents->postscript->footerlength() + DIRECTORY_SIZE_GUESS) {
+      memory =  contents->postscript->footerlength() + DIRECTORY_SIZE_GUESS;
     }
-    if (memory < postscript->metadatalength()) {
-      memory =  postscript->metadatalength();
+    if (memory < contents->postscript->metadatalength()) {
+      memory =  contents->postscript->metadatalength();
     }
 
     // Account for firstRowOfStripe.
-    memory += firstRowOfStripe.capacity() * sizeof(uint64_t);
+    memory += static_cast<uint64_t>(footer->stripes_size()) * sizeof(uint64_t);
 
     // Decompressors need buffers for each stream
     uint64_t decompressorMemory = 0;
-    if (compression != CompressionKind_NONE) {
+    if (contents->compression != CompressionKind_NONE) {
       for (int i=0; i < footer->types_size(); i++) {
         if (selectedColumns[static_cast<size_t>(i)]) {
           const proto::Type& type = footer->types(i);
-          decompressorMemory += maxStreamsForType(type) * blockSize;
+          decompressorMemory += maxStreamsForType(type) * contents->blockSize;
         }
       }
-      if (compression == CompressionKind_SNAPPY) {
+      if (contents->compression == CompressionKind_SNAPPY) {
         decompressorMemory *= 2;  // Snappy decompressor uses a second buffer
       }
     }
@@ -1762,75 +745,7 @@ namespace orc {
     return memory + decompressorMemory ;
   }
 
-  StripeStreamsImpl::StripeStreamsImpl(const ReaderImpl& _reader,
-                                       const proto::StripeFooter& _footer,
-                                       uint64_t _stripeStart,
-                                       InputStream& _input,
-                                       MemoryPool& _memoryPool,
-                                       const Timezone& _writerTimezone
-                                       ): reader(_reader),
-                                          footer(_footer),
-                                          stripeStart(_stripeStart),
-                                          input(_input),
-                                          memoryPool(_memoryPool),
-                                          writerTimezone(_writerTimezone) {
-    // PASS
-  }
-
-  StripeStreamsImpl::~StripeStreamsImpl() {
-    // PASS
-  }
-
-  const ReaderOptions& StripeStreamsImpl::getReaderOptions() const {
-    return reader.getReaderOptions();
-  }
-
-  const std::vector<bool> StripeStreamsImpl::getSelectedColumns() const {
-    return reader.getSelectedColumns();
-  }
-
-  proto::ColumnEncoding StripeStreamsImpl::getEncoding(uint64_t columnId
-                                                       ) const {
-    return footer.columns(static_cast<int>(columnId));
-  }
-
-  const Timezone& StripeStreamsImpl::getWriterTimezone() const {
-    return writerTimezone;
-  }
-
-  std::unique_ptr<SeekableInputStream>
-  StripeStreamsImpl::getStream(uint64_t columnId,
-                               proto::Stream_Kind kind,
-                               bool shouldStream) const {
-    uint64_t offset = stripeStart;
-    for(int i = 0; i < footer.streams_size(); ++i) {
-      const proto::Stream& stream = footer.streams(i);
-      if (stream.has_kind() &&
-          stream.kind() == kind &&
-          stream.column() == static_cast<uint64_t>(columnId)) {
-        uint64_t myBlock = shouldStream ? input.getNaturalReadSize():
-          stream.length();
-        return createDecompressor(reader.getCompression(),
-                                  std::unique_ptr<SeekableInputStream>
-                                  (new SeekableFileInputStream
-                                   (&input,
-                                    offset,
-                                    stream.length(),
-                                    memoryPool,
-                                    myBlock)),
-                                  reader.getCompressionSize(),
-                                  memoryPool);
-      }
-      offset += stream.length();
-    }
-    return std::unique_ptr<SeekableInputStream>();
-  }
-
-  MemoryPool& StripeStreamsImpl::getMemoryPool() const {
-    return memoryPool;
-  }
-
-  void ReaderImpl::startNextStripe() {
+  void RowReaderImpl::startNextStripe() {
     reader.reset(); // ColumnReaders use lots of memory; free old memory first
     currentStripeInfo = footer->stripes(static_cast<int>(currentStripe));
     currentStripeFooter = getStripeFooter(currentStripeInfo);
@@ -1841,23 +756,13 @@ namespace orc {
         localTimezone;
     StripeStreamsImpl stripeStreams(*this, currentStripeFooter,
                                     currentStripeInfo.offset(),
-                                    *(stream.get()),
+                                    *(contents->stream.get()),
                                     memoryPool,
                                     writerTimezone);
-    reader = buildReader(*(schema.get()), stripeStreams);
-  }
-
-  void ReaderImpl::checkOrcVersion() {
-    std::string version = getFormatVersion();
-    if (version != "0.11" && version != "0.12") {
-      *(options.getErrorStream())
-        << "Warning: ORC file " << stream->getName()
-        << " was written in an unknown format version "
-        << version << "\n";
-    }
+    reader = buildReader(*contents->schema.get(), stripeStreams);
   }
 
-  bool ReaderImpl::next(ColumnVectorBatch& data) {
+  bool RowReaderImpl::next(ColumnVectorBatch& data) {
     if (currentStripe >= lastStripe) {
       data.numElements = 0;
       if (lastStripe > 0) {
@@ -1886,7 +791,7 @@ namespace orc {
     return rowsToRead != 0;
   }
 
-  std::unique_ptr<ColumnVectorBatch> ReaderImpl::createRowBatch
+  std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch
                                               (uint64_t capacity) const {
     return getSelectedType().createRowBatch(capacity, memoryPool);
   }
@@ -1978,8 +883,7 @@ namespace orc {
   std::unique_ptr<Reader> createReader(std::unique_ptr<InputStream> stream,
                                        const ReaderOptions& options) {
     MemoryPool *memoryPool = options.getMemoryPool();
-    std::unique_ptr<proto::PostScript> ps;
-    std::unique_ptr<proto::Footer> footer;
+    std::shared_ptr<FileContents> contents = std::shared_ptr<FileContents>(new FileContents());
     std::string serializedFooter = options.getSerializedFileTail();
     uint64_t fileLength;
     uint64_t postscriptLength;
@@ -1989,8 +893,8 @@ namespace orc {
       if (!tail.ParseFromString(serializedFooter)) {
         throw ParseError("Failed to parse the file tail from string");
       }
-      ps.reset(new proto::PostScript(tail.postscript()));
-      footer.reset(new proto::Footer(tail.footer()));
+      contents->postscript.reset(new proto::PostScript(tail.postscript()));
+      contents->footer.reset(new proto::Footer(tail.footer()));
       fileLength = tail.filelength();
       postscriptLength = tail.postscriptlength();
     } else {
@@ -2007,8 +911,9 @@ namespace orc {
       stream->read(buffer->data(), readSize, fileLength - readSize);
 
       postscriptLength = buffer->data()[readSize - 1] & 0xff;
-      ps = readPostscript(stream.get(), buffer, postscriptLength);
-      uint64_t footerSize = ps->footerlength();
+      contents->postscript = REDUNDANT_MOVE(readPostscript(stream.get(),
+        buffer, postscriptLength));
+      uint64_t footerSize = contents->postscript->footerlength();
       uint64_t tailSize = 1 + postscriptLength + footerSize;
       uint64_t footerOffset;
 
@@ -2020,330 +925,69 @@ namespace orc {
         footerOffset = readSize - tailSize;
       }
 
-      footer = readFooter(stream.get(), buffer, footerOffset, *ps,
-                          *memoryPool);
+      contents->footer = REDUNDANT_MOVE(readFooter(stream.get(), buffer,
+        footerOffset, *contents->postscript,  *memoryPool));
       delete buffer;
     }
-    return std::unique_ptr<Reader>(new ReaderImpl(std::move(stream),
+    contents->stream = std::move(stream);
+    return std::unique_ptr<Reader>(new ReaderImpl(std::move(contents),
                                                   options,
-                                                  std::move(ps),
-                                                  std::move(footer),
                                                   fileLength,
                                                   postscriptLength));
   }
 
-  ColumnStatistics::~ColumnStatistics() {
-    // PASS
-  }
-
-  BinaryColumnStatistics::~BinaryColumnStatistics() {
-    // PASS
-  }
-
-  BooleanColumnStatistics::~BooleanColumnStatistics() {
-    // PASS
-  }
-
-  DateColumnStatistics::~DateColumnStatistics() {
-    // PASS
-  }
-
-  DecimalColumnStatistics::~DecimalColumnStatistics() {
-    // PASS
-  }
-
-  DoubleColumnStatistics::~DoubleColumnStatistics() {
-    // PASS
-  }
-
-  IntegerColumnStatistics::~IntegerColumnStatistics() {
-    // PASS
-  }
-
-  StringColumnStatistics::~StringColumnStatistics() {
-    // PASS
-  }
-
-  TimestampColumnStatistics::~TimestampColumnStatistics() {
-    // PASS
-  }
-
-  ColumnStatisticsImpl::~ColumnStatisticsImpl() {
-    // PASS
-  }
-
-  BinaryColumnStatisticsImpl::~BinaryColumnStatisticsImpl() {
-    // PASS
-  }
-
-  BooleanColumnStatisticsImpl::~BooleanColumnStatisticsImpl() {
-    // PASS
-  }
-
-  DateColumnStatisticsImpl::~DateColumnStatisticsImpl() {
-    // PASS
-  }
-
-  DecimalColumnStatisticsImpl::~DecimalColumnStatisticsImpl() {
-    // PASS
+  std::string streamKindToString(StreamKind kind) {
+    switch (static_cast<int>(kind)) {
+    case StreamKind_PRESENT:
+      return "present";
+    case StreamKind_DATA:
+      return "data";
+    case StreamKind_LENGTH:
+      return "length";
+    case StreamKind_DICTIONARY_DATA:
+      return "dictionary";
+    case StreamKind_DICTIONARY_COUNT:
+      return "dictionary count";
+    case StreamKind_SECONDARY:
+      return "secondary";
+    case StreamKind_ROW_INDEX:
+      return "index";
+    case StreamKind_BLOOM_FILTER:
+      return "bloom";
+    }
+    std::stringstream buffer;
+    buffer << "unknown - " << kind;
+    return buffer.str();
   }
 
-  DoubleColumnStatisticsImpl::~DoubleColumnStatisticsImpl() {
-    // PASS
+  std::string columnEncodingKindToString(ColumnEncodingKind kind) {
+    switch (static_cast<int>(kind)) {
+    case ColumnEncodingKind_DIRECT:
+      return "direct";
+    case ColumnEncodingKind_DICTIONARY:
+      return "dictionary";
+    case ColumnEncodingKind_DIRECT_V2:
+      return "direct rle2";
+    case ColumnEncodingKind_DICTIONARY_V2:
+      return "dictionary rle2";
+    }
+    std::stringstream buffer;
+    buffer << "unknown - " << kind;
+    return buffer.str();
   }
 
-  IntegerColumnStatisticsImpl::~IntegerColumnStatisticsImpl() {
+  RowReader::~RowReader() {
     // PASS
   }
 
-  StringColumnStatisticsImpl::~StringColumnStatisticsImpl() {
+  Reader::~Reader() {
     // PASS
   }
 
-  TimestampColumnStatisticsImpl::~TimestampColumnStatisticsImpl() {
+  InputStream::~InputStream() {
     // PASS
-  }
-
-  ColumnStatisticsImpl::ColumnStatisticsImpl
-  (const proto::ColumnStatistics& pb) {
-    valueCount = pb.numberofvalues();
-  }
-
-  BinaryColumnStatisticsImpl::BinaryColumnStatisticsImpl
-  (const proto::ColumnStatistics& pb, bool correctStats){
-    valueCount = pb.numberofvalues();
-    if (!pb.has_binarystatistics() || !correctStats) {
-      _hasTotalLength = false;
-
-      totalLength = 0;
-    }else{
-      _hasTotalLength = pb.binarystatistics().has_sum();
-      totalLength = static_cast<uint64_t>(pb.binarystatistics().sum());
-    }
-  }
-
-  BooleanColumnStatisticsImpl::BooleanColumnStatisticsImpl
-  (const proto::ColumnStatistics& pb, bool correctStats){
-    valueCount = pb.numberofvalues();
-    if (!pb.has_bucketstatistics() || !correctStats) {
-      _hasCount = false;
-      trueCount = 0;
-    }else{
-      _hasCount = true;
-      trueCount = pb.bucketstatistics().count(0);
-    }
-  }
-
-  DateColumnStatisticsImpl::DateColumnStatisticsImpl
-  (const proto::ColumnStatistics& pb, bool correctStats){
-    valueCount = pb.numberofvalues();
-    if (!pb.has_datestatistics() || !correctStats) {
-      _hasMinimum = false;
-      _hasMaximum = false;
-
-      minimum = 0;
-      maximum = 0;
-    } else {
-      _hasMinimum = pb.datestatistics().has_minimum();
-      _hasMaximum = pb.datestatistics().has_maximum();
-      minimum = pb.datestatistics().minimum();
-      maximum = pb.datestatistics().maximum();
-    }
-  }
-
-  DecimalColumnStatisticsImpl::DecimalColumnStatisticsImpl
-  (const proto::ColumnStatistics& pb, bool correctStats){
-    valueCount = pb.numberofvalues();
-    if (!pb.has_decimalstatistics() || !correctStats) {
-      _hasMinimum = false;
-      _hasMaximum = false;
-      _hasSum = false;
-    }else{
-      const proto::DecimalStatistics& stats = pb.decimalstatistics();
-      _hasMinimum = stats.has_minimum();
-      _hasMaximum = stats.has_maximum();
-      _hasSum = stats.has_sum();
-
-      minimum = stats.minimum();
-      maximum = stats.maximum();
-      sum = stats.sum();
-    }
-  }
-
-  DoubleColumnStatisticsImpl::DoubleColumnStatisticsImpl
-  (const proto::ColumnStatistics& pb){
-    valueCount = pb.numberofvalues();
-    if (!pb.has_doublestatistics()) {
-      _hasMinimum = false;
-      _hasMaximum = false;
-      _hasSum = false;
-
-      minimum = 0;
-      maximum = 0;
-      sum = 0;
-    }else{
-      const proto::DoubleStatistics& stats = pb.doublestatistics();
-      _hasMinimum = stats.has_minimum();
-      _hasMaximum = stats.has_maximum();
-      _hasSum = stats.has_sum();
-
-      minimum = stats.minimum();
-      maximum = stats.maximum();
-      sum = stats.sum();
-    }
-  }
-
-  IntegerColumnStatisticsImpl::IntegerColumnStatisticsImpl
-  (const proto::ColumnStatistics& pb){
-    valueCount = pb.numberofvalues();
-    if (!pb.has_intstatistics()) {
-      _hasMinimum = false;
-      _hasMaximum = false;
-      _hasSum = false;
-
-      minimum = 0;
-      maximum = 0;
-      sum = 0;
-    }else{
-      const proto::IntegerStatistics& stats = pb.intstatistics();
-      _hasMinimum = stats.has_minimum();
-      _hasMaximum = stats.has_maximum();
-      _hasSum = stats.has_sum();
-
-      minimum = stats.minimum();
-      maximum = stats.maximum();
-      sum = stats.sum();
-    }
-  }
-
-  StringColumnStatisticsImpl::StringColumnStatisticsImpl
-  (const proto::ColumnStatistics& pb, bool correctStats){
-    valueCount = pb.numberofvalues();
-    if (!pb.has_stringstatistics() || !correctStats) {
-      _hasMinimum = false;
-      _hasMaximum = false;
-      _hasTotalLength = false;
-
-      totalLength = 0;
-    }else{
-      const proto::StringStatistics& stats = pb.stringstatistics();
-      _hasMinimum = stats.has_minimum();
-      _hasMaximum = stats.has_maximum();
-      _hasTotalLength = stats.has_sum();
-
-      minimum = stats.minimum();
-      maximum = stats.maximum();
-      totalLength = static_cast<uint64_t>(stats.sum());
-    }
-  }
-
-  TimestampColumnStatisticsImpl::TimestampColumnStatisticsImpl
-  (const proto::ColumnStatistics& pb, bool correctStats) {
-    valueCount = pb.numberofvalues();
-    if (!pb.has_timestampstatistics() || !correctStats) {
-      _hasMinimum = false;
-      _hasMaximum = false;
-      minimum = 0;
-      maximum = 0;
-    }else{
-      const proto::TimestampStatistics& stats = pb.timestampstatistics();
-      _hasMinimum = stats.has_minimum();
-      _hasMaximum = stats.has_maximum();
-
-      minimum = stats.minimum();
-      maximum = stats.maximum();
-    }
-  }
-
-  void ReaderImpl::updateSelectedByFieldId(uint64_t fieldId) {
-    if (fieldId < schema->getSubtypeCount()) {
-      selectChildren(*schema->getSubtype(fieldId));
-    } else {
-      std::stringstream buffer;
-      buffer << "Invalid column selected " << fieldId << " out of "
-             << schema->getSubtypeCount();
-      throw ParseError(buffer.str());
-    }
-  }
-
-  void ReaderImpl::updateSelectedByTypeId(uint64_t typeId) {
-    if (typeId < selectedColumns.size()) {
-      const Type& type = *idTypeMap[typeId];
-      selectChildren(type);
-    } else {
-      std::stringstream buffer;
-      buffer << "Invalid type id selected " << typeId << " out of "
-             << selectedColumns.size();
-      throw ParseError(buffer.str());
-    }
-  }
-
-  void ReaderImpl::updateSelectedByName(const std::string& fieldName) {
-    std::map<std::string, uint64_t>::const_iterator ite = nameIdMap.find(fieldName);
-    if (ite != nameIdMap.end()) {
-      updateSelectedByTypeId(ite->second);
-    } else {
-      throw ParseError("Invalid column selected " + fieldName);
-    }
-  }
-
-  void ReaderImpl::selectChildren(const Type& type) {
-    size_t id = static_cast<size_t>(type.getColumnId());
-    if (!selectedColumns[id]) {
-      selectedColumns[id] = true;
-      for(size_t c = id; c <= type.getMaximumColumnId(); ++c){
-        selectedColumns[c] = true;
-      }
-    }
-  }
-
-  /**
-   * Recurses over a type tree and selects the parents of every selected type.
-   * @return true if any child was selected.
-   */
-  bool ReaderImpl::selectParents(const Type& type) {
-    size_t id = static_cast<size_t>(type.getColumnId());
-    bool result = selectedColumns[id];
-    for(uint64_t c=0; c < type.getSubtypeCount(); ++c) {
-      result |= selectParents(*type.getSubtype(c));
-    }
-    selectedColumns[id] = result;
-    return result;
-  }
-
-  /**
-   * Recurses over a type tree and build two maps
-   * map<TypeName, TypeId>, map<TypeId, Type>
-   */
-  void ReaderImpl::buildTypeNameIdMap(const Type* type, std::vector<std::string>& columns) {
-    // map<type_id, Type*>
-    idTypeMap[type->getColumnId()] = type;
+  };
 
-    if (orc::STRUCT == type->getKind()) {
-      for (size_t i = 0; i < type->getSubtypeCount(); ++i) {
-        const std::string& fieldName = type->getFieldName(i);
-        columns.push_back(fieldName);
-        nameIdMap[toDotColumnPath(columns)] = type->getSubtype(i)->getColumnId();
-        buildTypeNameIdMap(type->getSubtype(i), columns);
-        columns.pop_back();
-      }
-    } else {
-      // other non-primitive type
-      for (size_t j = 0; j < type->getSubtypeCount(); ++j) {
-        buildTypeNameIdMap(type->getSubtype(j), columns);
-      }
-    }
-  }
 
-  std::string ReaderImpl::toDotColumnPath(const std::vector<std::string>& columns) {
-      if (columns.empty()) {
-          return std::string();
-      }
-      std::ostringstream columnStream;
-      std::copy(columns.begin(), columns.end(),
-              std::ostream_iterator<std::string>(columnStream, "."));
-      std::string columnPath = columnStream.str();
-      return columnPath.substr(0, columnPath.length() - 1);
-  }
 
 }// namespace

http://git-wip-us.apache.org/repos/asf/orc/blob/6bf63bb1/c++/src/Reader.hh
----------------------------------------------------------------------
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
new file mode 100644
index 0000000..3b2eac1
--- /dev/null
+++ b/c++/src/Reader.hh
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_READER_IMPL_HH
+#define ORC_READER_IMPL_HH
+
+#include "orc/Int128.hh"
+#include "orc/OrcFile.hh"
+#include "orc/Reader.hh"
+
+#include "ColumnReader.hh"
+#include "Exceptions.hh"
+#include "RLE.hh"
+#include "TypeImpl.hh"
+
+namespace orc {
+
+  class ReaderOptions;
+  class RowReaderOptions;
+  class StripeInformation;
+
+  static const uint64_t DIRECTORY_SIZE_GUESS = 16 * 1024;
+
+  /**
+  * State shared between Reader and Row Reader
+  */
+  struct FileContents {
+    std::unique_ptr<InputStream> stream;
+    std::unique_ptr<proto::PostScript> postscript;
+    std::unique_ptr<proto::Footer> footer;
+    std::unique_ptr<Type> schema;
+    uint64_t blockSize;
+    CompressionKind compression;
+  };
+
+  class ReaderImpl;
+
+  class ColumnSelector {
+   private:
+    std::map<std::string, uint64_t> nameIdMap;
+    std::map<uint64_t, const Type*> idTypeMap;
+    const FileContents* contents;
+    std::vector<std::string> columns;
+
+    // build map from type name and id, id to Type
+    void buildTypeNameIdMap(const Type* type);
+    std::string toDotColumnPath();
+
+   public:
+    // Select a field by name
+    void updateSelectedByName(std::vector<bool>& selectedColumns, const std::string& name);
+    // Select a field by id
+    void updateSelectedByFieldId(std::vector<bool>& selectedColumns, uint64_t fieldId);
+    // Select a type by id
+    void updateSelectedByTypeId(std::vector<bool>& selectedColumns, uint64_t typeId);
+
+    // Select all of the recursive children of the given type.
+    void selectChildren(std::vector<bool>& selectedColumns, const Type& type);
+
+    // For each child of type, select it if one of its children
+    // is selected.
+    bool selectParents(std::vector<bool>& selectedColumns, const Type& type);
+   /**
+    * Constructor that selects columns.
+    * @param contents of the file
+    */
+    ColumnSelector(const FileContents* contents);
+
+    // Select the columns from the RowReaderoptions object
+    void updateSelected(std::vector<bool>& selectedColumns, const RowReaderOptions& options);
+
+    // Select the columns from the Readeroptions object
+    void updateSelected(std::vector<bool>& selectedColumns, const ReaderOptions& options);
+  };
+
+
+  class RowReaderImpl : public RowReader {
+  private:
+    const Timezone& localTimezone;
+
+    // contents
+    std::shared_ptr<FileContents> contents;
+
+    // inputs
+    std::vector<bool> selectedColumns;
+    const RowReaderOptions& options;
+
+    // custom memory pool
+    MemoryPool& memoryPool;
+
+    // footer
+    proto::Footer* footer;
+    DataBuffer<uint64_t> firstRowOfStripe;
+    mutable std::unique_ptr<Type> selectedSchema;
+
+    // reading state
+    uint64_t previousRow;
+    uint64_t firstStripe;
+    uint64_t currentStripe;
+    uint64_t lastStripe; // the stripe AFTER the last one
+    uint64_t currentRowInStripe;
+    uint64_t rowsInCurrentStripe;
+    proto::StripeInformation currentStripeInfo;
+    proto::StripeFooter currentStripeFooter;
+    std::unique_ptr<ColumnReader> reader;
+
+    // internal methods
+    proto::StripeFooter getStripeFooter(const proto::StripeInformation& info);
+    void startNextStripe();
+
+  public:
+   /**
+    * Constructor that lets the user specify additional options.
+    * @param contents of the file
+    * @param options options for reading
+    */
+    RowReaderImpl(std::shared_ptr<FileContents> contents,
+           const RowReaderOptions& options);
+
+    // Select the columns from the options object
+    void updateSelected();
+    const std::vector<bool> getSelectedColumns() const override;
+
+    const Type& getSelectedType() const override;
+
+    std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size
+                                                      ) const override;
+
+    bool next(ColumnVectorBatch& data) override;
+
+    const RowReaderOptions& getRowReaderOptions() const;
+
+    CompressionKind getCompression() const;
+
+    uint64_t getCompressionSize() const;
+
+    uint64_t getRowNumber() const override;
+
+    void seekToRow(uint64_t rowNumber) override;
+
+    MemoryPool* getMemoryPool() const ;
+
+  };
+
+  class ReaderImpl : public Reader {
+   private:
+    // FileContents
+    std::shared_ptr<FileContents> contents;
+
+    // inputs
+    const ReaderOptions& options;
+    const uint64_t fileLength;
+    const uint64_t postscriptLength;
+
+    // custom memory pool
+    MemoryPool& memoryPool;
+
+    // footer
+    proto::Footer* footer;
+    uint64_t numberOfStripes;
+    uint64_t getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns);
+
+    // internal methods
+    void readMetadata() const;
+    void checkOrcVersion();
+
+    // metadata
+    mutable std::unique_ptr<proto::Metadata> metadata;
+    mutable bool isMetadataLoaded;
+   public:
+    /**
+     * Constructor that lets the user specify additional options.
+     * @param contents of the file
+     * @param options options for reading
+     * @param fileLength the length of the file in bytes
+     * @param postscriptLength the length of the postscript in bytes
+     */
+    ReaderImpl(std::shared_ptr<FileContents> contents,
+               const ReaderOptions& options,
+               uint64_t fileLength,
+               uint64_t postscriptLength);
+
+    const ReaderOptions& getReaderOptions() const;
+
+    CompressionKind getCompression() const override;
+
+    std::string getFormatVersion() const override;
+
+    WriterVersion getWriterVersion() const override;
+
+    uint64_t getNumberOfRows() const override;
+
+    uint64_t getRowIndexStride() const override;
+
+    std::list<std::string> getMetadataKeys() const override;
+
+    std::string getMetadataValue(const std::string& key) const override;
+
+    bool hasMetadataValue(const std::string& key) const override;
+
+    uint64_t getCompressionSize() const override;
+
+    uint64_t getNumberOfStripes() const override;
+
+    std::unique_ptr<StripeInformation> getStripe(uint64_t
+                                                 ) const override;
+
+    uint64_t getNumberOfStripeStatistics() const override;
+
+    const std::string& getStreamName() const override;
+
+    std::unique_ptr<Statistics>
+    getStripeStatistics(uint64_t stripeIndex) const override;
+
+    std::unique_ptr<RowReader> getRowReader(const RowReaderOptions& options
+                                           ) const override;
+
+    uint64_t getContentLength() const override;
+    uint64_t getStripeStatisticsLength() const override;
+    uint64_t getFileFooterLength() const override;
+    uint64_t getFilePostscriptLength() const override;
+    uint64_t getFileLength() const override;
+
+    std::unique_ptr<Statistics> getStatistics() const override;
+
+    std::unique_ptr<ColumnStatistics> getColumnStatistics(uint32_t columnId
+                                                          ) const override;
+
+    std::string getSerializedFileTail() const override;
+
+    const Type& getType() const override;
+
+    bool hasCorrectStatistics() const override;
+
+    const proto::PostScript* getPostscript() const {return contents->postscript.get();}
+
+    uint64_t getBlockSize() const {return contents->blockSize;}
+
+    const proto::Footer* getFooter() const {return contents->footer.get();}
+
+    const Type* getSchema() const {return contents->schema.get();}
+
+    InputStream* getStream() const {return contents->stream.get();}
+
+    uint64_t getMemoryUse(int stripeIx = -1) override;
+
+    uint64_t getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx=-1) override;
+
+    uint64_t getMemoryUseByName(const std::list<std::string>& names, int stripeIx=-1) override;
+
+    uint64_t getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx=-1) override;
+  };
+
+}// namespace
+
+#endif