You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2015/07/06 23:52:47 UTC

[18/23] orc git commit: ORC-23. Simplify directory structure.

http://git-wip-us.apache.org/repos/asf/orc/blob/7f55b453/c++/src/orc/ColumnReader.cc
----------------------------------------------------------------------
diff --git a/c++/src/orc/ColumnReader.cc b/c++/src/orc/ColumnReader.cc
deleted file mode 100644
index 0b6a9cb..0000000
--- a/c++/src/orc/ColumnReader.cc
+++ /dev/null
@@ -1,1557 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "orc/Adaptor.hh"
-#include "ByteRLE.hh"
-#include "ColumnReader.hh"
-#include "Exceptions.hh"
-#include "orc/Int128.hh"
-#include "RLE.hh"
-
-#include <math.h>
-#include <iostream>
-
-namespace orc {
-
-  StripeStreams::~StripeStreams() {
-    // PASS
-  }
-
-  inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) {
-    switch (static_cast<int64_t>(kind)) {
-    case proto::ColumnEncoding_Kind_DIRECT:
-    case proto::ColumnEncoding_Kind_DICTIONARY:
-      return RleVersion_1;
-    case proto::ColumnEncoding_Kind_DIRECT_V2:
-    case proto::ColumnEncoding_Kind_DICTIONARY_V2:
-      return RleVersion_2;
-    default:
-      throw ParseError("Unknown encoding in convertRleVersion");
-    }
-  }
-
-  ColumnReader::ColumnReader(const Type& type,
-                             StripeStreams& stripe
-                             ): columnId(type.getColumnId()),
-                                memoryPool(stripe.getMemoryPool()) {
-    std::unique_ptr<SeekableInputStream> stream =
-      stripe.getStream(columnId, proto::Stream_Kind_PRESENT, true);
-    if (stream.get()) {
-      notNullDecoder = createBooleanRleDecoder(std::move(stream));
-    }
-  }
-
-  ColumnReader::~ColumnReader() {
-    // PASS
-  }
-
-  uint64_t ColumnReader::skip(uint64_t numValues) {
-    ByteRleDecoder* decoder = notNullDecoder.get();
-    if (decoder) {
-      // page through the values that we want to skip
-      // and count how many are non-null
-      const size_t MAX_BUFFER_SIZE = 32768;
-      size_t bufferSize = std::min(MAX_BUFFER_SIZE,
-                                   static_cast<size_t>(numValues));
-      char buffer[MAX_BUFFER_SIZE];
-      uint64_t remaining = numValues;
-      while (remaining > 0) {
-        uint64_t chunkSize =
-          std::min(remaining,
-                   static_cast<uint64_t>(bufferSize));
-        decoder->next(buffer, chunkSize, 0);
-        remaining -= chunkSize;
-        for(uint64_t i=0; i < chunkSize; ++i) {
-          if (!buffer[i]) {
-            numValues -= 1;
-          }
-        }
-      }
-    }
-    return numValues;
-  }
-
-  void ColumnReader::next(ColumnVectorBatch& rowBatch,
-                          uint64_t numValues,
-                          char* incomingMask) {
-    if (numValues > rowBatch.capacity) {
-      rowBatch.resize(numValues);
-    }
-    rowBatch.numElements = numValues;
-    ByteRleDecoder* decoder = notNullDecoder.get();
-    if (decoder) {
-      char* notNullArray = rowBatch.notNull.data();
-      decoder->next(notNullArray, numValues, incomingMask);
-      // check to see if there are nulls in this batch
-      for(uint64_t i=0; i < numValues; ++i) {
-        if (!notNullArray[i]) {
-          rowBatch.hasNulls = true;
-          return;
-        }
-      }
-    } else if (incomingMask) {
-      // If we don't have a notNull stream, copy the incomingMask
-      rowBatch.hasNulls = true;
-      memcpy(rowBatch.notNull.data(), incomingMask, numValues);
-      return;
-    }
-    rowBatch.hasNulls = false;
-  }
-
-  /**
-   * Expand an array of bytes in place to the corresponding array of longs.
-   * Has to work backwards so that they data isn't clobbered during the
-   * expansion.
-   * @param buffer the array of chars and array of longs that need to be
-   *        expanded
-   * @param numValues the number of bytes to convert to longs
-   */
-  void expandBytesToLongs(int64_t* buffer, uint64_t numValues) {
-    for(size_t i=numValues - 1; i < numValues; --i) {
-      buffer[i] = reinterpret_cast<char *>(buffer)[i];
-    }
-  }
-
-  class BooleanColumnReader: public ColumnReader {
-  private:
-    std::unique_ptr<orc::ByteRleDecoder> rle;
-
-  public:
-    BooleanColumnReader(const Type& type, StripeStreams& stipe);
-    ~BooleanColumnReader();
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char* notNull) override;
-  };
-
-  BooleanColumnReader::BooleanColumnReader(const Type& type,
-                                           StripeStreams& stripe
-                                           ): ColumnReader(type, stripe){
-    rle = createBooleanRleDecoder(stripe.getStream(columnId,
-                                                   proto::Stream_Kind_DATA,
-                                                   true));
-  }
-
-  BooleanColumnReader::~BooleanColumnReader() {
-    // PASS
-  }
-
-  uint64_t BooleanColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
-    rle->skip(numValues);
-    return numValues;
-  }
-
-  void BooleanColumnReader::next(ColumnVectorBatch& rowBatch,
-                                 uint64_t numValues,
-                                 char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    // Since the byte rle places the output in a char* instead of long*,
-    // we cheat here and use the long* and then expand it in a second pass.
-    int64_t *ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
-    rle->next(reinterpret_cast<char*>(ptr),
-              numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : 0);
-    expandBytesToLongs(ptr, numValues);
-  }
-
-  class ByteColumnReader: public ColumnReader {
-  private:
-    std::unique_ptr<orc::ByteRleDecoder> rle;
-
-  public:
-    ByteColumnReader(const Type& type, StripeStreams& stipe);
-    ~ByteColumnReader();
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char* notNull) override;
-  };
-
-  ByteColumnReader::ByteColumnReader(const Type& type,
-                                           StripeStreams& stripe
-                                           ): ColumnReader(type, stripe){
-    rle = createByteRleDecoder(stripe.getStream(columnId,
-                                                proto::Stream_Kind_DATA,
-                                                true));
-  }
-
-  ByteColumnReader::~ByteColumnReader() {
-    // PASS
-  }
-
-  uint64_t ByteColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
-    rle->skip(numValues);
-    return numValues;
-  }
-
-  void ByteColumnReader::next(ColumnVectorBatch& rowBatch,
-                              uint64_t numValues,
-                              char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    // Since the byte rle places the output in a char* instead of long*,
-    // we cheat here and use the long* and then expand it in a second pass.
-    int64_t *ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
-    rle->next(reinterpret_cast<char*>(ptr),
-              numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : 0);
-    expandBytesToLongs(ptr, numValues);
-  }
-
-  class IntegerColumnReader: public ColumnReader {
-  protected:
-    std::unique_ptr<orc::RleDecoder> rle;
-
-  public:
-    IntegerColumnReader(const Type& type, StripeStreams& stripe);
-    ~IntegerColumnReader();
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char* notNull) override;
-  };
-
-  IntegerColumnReader::IntegerColumnReader(const Type& type,
-                                           StripeStreams& stripe
-                                           ): ColumnReader(type, stripe) {
-    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
-    rle = createRleDecoder(stripe.getStream(columnId,
-                                            proto::Stream_Kind_DATA,
-                                            true),
-                           true, vers, memoryPool);
-  }
-
-  IntegerColumnReader::~IntegerColumnReader() {
-    // PASS
-  }
-
-  uint64_t IntegerColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
-    rle->skip(numValues);
-    return numValues;
-  }
-
-  void IntegerColumnReader::next(ColumnVectorBatch& rowBatch,
-                                 uint64_t numValues,
-                                 char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    rle->next(dynamic_cast<LongVectorBatch&>(rowBatch).data.data(),
-              numValues, rowBatch.hasNulls ? rowBatch.notNull.data() : 0);
-  }
-
-  class TimestampColumnReader: public IntegerColumnReader {
-  private:
-    std::unique_ptr<orc::RleDecoder> nanoRle;
-    DataBuffer<int64_t> nanoBuffer;
-
-  public:
-    TimestampColumnReader(const Type& type, StripeStreams& stripe);
-    ~TimestampColumnReader();
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char* notNull) override;
-  };
-
-
-  TimestampColumnReader::TimestampColumnReader(const Type& type,
-                                               StripeStreams& stripe
-                                               ): IntegerColumnReader(type,
-                                                                      stripe),
-                                                  nanoBuffer(memoryPool, 1024){
-    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
-    nanoRle = createRleDecoder(stripe.getStream(columnId,
-                                                proto::Stream_Kind_SECONDARY,
-                                                true),
-                               false, vers, memoryPool);
-  }
-
-  TimestampColumnReader::~TimestampColumnReader() {
-    // PASS
-  }
-
-  uint64_t TimestampColumnReader::skip(uint64_t numValues) {
-    numValues = IntegerColumnReader::skip(numValues);
-    nanoRle->skip(numValues);
-    return numValues;
-  }
-
-  void TimestampColumnReader::next(ColumnVectorBatch& rowBatch,
-                                 uint64_t numValues,
-                                 char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
-    int64_t* pStamp = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
-
-    // make sure that nanoBuffer is large enough
-    if (numValues > nanoBuffer.size()) {
-      nanoBuffer.resize(numValues);
-    }
-
-    rle->next(pStamp, numValues, notNull);
-    nanoRle->next(nanoBuffer.data(), numValues, notNull);
-
-    // Construct the values
-    for(uint64_t i=0; i < numValues; i++) {
-      if (notNull == nullptr || notNull[i]) {
-        int64_t nanosec =  nanoBuffer[i] >> 3;
-        uint64_t zeros = nanoBuffer[i] & 0x7;
-        if (zeros != 0) {
-          for(uint64_t j = 0; j <= zeros; ++j) {
-            nanosec *= 10;
-          }
-        }
-        pStamp[i] =  pStamp[i] * 1000000000 + 1420070400000000000;
-        if (pStamp[i] >= 0) {
-          pStamp[i] += nanosec;
-        } else {
-          pStamp[i] -= nanosec;
-        }
-      }
-    }
-  }
-
-  class DoubleColumnReader: public ColumnReader {
-  public:
-    DoubleColumnReader(const Type& type, StripeStreams& stripe);
-    ~DoubleColumnReader();
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char* notNull) override;
-
-  private:
-    std::unique_ptr<SeekableInputStream> inputStream;
-    TypeKind columnKind;
-    const uint64_t bytesPerValue ;
-    const char *bufferPointer;
-    const char *bufferEnd;
-
-    unsigned char readByte() {
-      if (bufferPointer == bufferEnd) {
-        int length;
-        if (!inputStream->Next
-            (reinterpret_cast<const void**>(&bufferPointer), &length)) {
-          throw ParseError("bad read in DoubleColumnReader::next()");
-        }
-        bufferEnd = bufferPointer + length;
-      }
-      return static_cast<unsigned char>(*(bufferPointer++));
-    }
-
-    double readDouble() {
-      int64_t bits = 0;
-      for (uint64_t i=0; i < 8; i++) {
-        bits |= static_cast<int64_t>(readByte()) << (i*8);
-      }
-      double *result = reinterpret_cast<double*>(&bits);
-      return *result;
-    }
-
-    double readFloat() {
-      int32_t bits = 0;
-      for (uint64_t i=0; i < 4; i++) {
-        bits |= readByte() << (i*8);
-      }
-      float *result = reinterpret_cast<float*>(&bits);
-      return *result;
-    }
-  };
-
-  DoubleColumnReader::DoubleColumnReader(const Type& type,
-                                         StripeStreams& stripe
-                                         ): ColumnReader(type, stripe),
-                                            inputStream
-                                               (stripe.getStream
-                                                (columnId,
-                                                 proto::Stream_Kind_DATA,
-                                                 true)),
-                                            columnKind(type.getKind()),
-                                            bytesPerValue((type.getKind() ==
-                                                           FLOAT) ? 4 : 8),
-                                            bufferPointer(NULL),
-                                            bufferEnd(NULL) {
-    // PASS
-  }
-
-  DoubleColumnReader::~DoubleColumnReader() {
-    // PASS
-  }
-
-  uint64_t DoubleColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
-
-    if (static_cast<size_t>(bufferEnd - bufferPointer) >=
-        bytesPerValue * numValues) {
-      bufferPointer+= bytesPerValue*numValues;
-    } else {
-      inputStream->Skip(static_cast<int>(bytesPerValue*numValues -
-                                         static_cast<size_t>(bufferEnd -
-                                                             bufferPointer)));
-      bufferEnd = NULL;
-      bufferPointer = NULL;
-    }
-
-    return numValues;
-  }
-
-  void DoubleColumnReader::next(ColumnVectorBatch& rowBatch,
-                                uint64_t numValues,
-                                char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    // update the notNull from the parent class
-    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : 0;
-    double* outArray = dynamic_cast<DoubleVectorBatch&>(rowBatch).data.data();
-
-    if (columnKind == FLOAT) {
-      if (notNull) {
-        for(size_t i=0; i < numValues; ++i) {
-          if (notNull[i]) {
-            outArray[i] = readFloat();
-          }
-        }
-      } else {
-        for(size_t i=0; i < numValues; ++i) {
-          outArray[i] = readFloat();
-        }
-      }
-    } else {
-      if (notNull) {
-        for(size_t i=0; i < numValues; ++i) {
-          if (notNull[i]) {
-            outArray[i] = readDouble();
-          }
-        }
-      } else {
-        for(size_t i=0; i < numValues; ++i) {
-          outArray[i] = readDouble();
-        }
-      }
-    }
-  }
-
-  void readFully(char* buffer, int64_t bufferSize, SeekableInputStream* stream) {
-    int64_t posn = 0;
-    while (posn < bufferSize) {
-      const void* chunk;
-      int length;
-      if (!stream->Next(&chunk, &length)) {
-        throw ParseError("bad read in readFully");
-      }
-      memcpy(buffer + posn, chunk, static_cast<size_t>(length));
-      posn += length;
-    }
-  }
-
-  class StringDictionaryColumnReader: public ColumnReader {
-  private:
-    DataBuffer<char> dictionaryBlob;
-    DataBuffer<int64_t> dictionaryOffset;
-    std::unique_ptr<RleDecoder> rle;
-    uint64_t dictionaryCount;
-
-  public:
-    StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
-    ~StringDictionaryColumnReader();
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char *notNull) override;
-  };
-
-  StringDictionaryColumnReader::StringDictionaryColumnReader
-             (const Type& type,
-              StripeStreams& stripe
-              ): ColumnReader(type, stripe),
-                 dictionaryBlob(stripe.getMemoryPool()),
-                 dictionaryOffset(stripe.getMemoryPool()) {
-    RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId)
-                                                .kind());
-    dictionaryCount = stripe.getEncoding(columnId).dictionarysize();
-    rle = createRleDecoder(stripe.getStream(columnId,
-                                            proto::Stream_Kind_DATA,
-                                            true),
-                           false, rleVersion, memoryPool);
-    std::unique_ptr<RleDecoder> lengthDecoder =
-      createRleDecoder(stripe.getStream(columnId,
-                                        proto::Stream_Kind_LENGTH,
-                                        false),
-                       false, rleVersion, memoryPool);
-    dictionaryOffset.resize(dictionaryCount+1);
-    int64_t* lengthArray = dictionaryOffset.data();
-    lengthDecoder->next(lengthArray + 1, dictionaryCount, 0);
-    lengthArray[0] = 0;
-    for(uint64_t i=1; i < dictionaryCount + 1; ++i) {
-      lengthArray[i] += lengthArray[i-1];
-    }
-    int64_t blobSize = lengthArray[dictionaryCount];
-    dictionaryBlob.resize(static_cast<uint64_t>(blobSize));
-    std::unique_ptr<SeekableInputStream> blobStream =
-      stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false);
-    readFully(dictionaryBlob.data(), blobSize, blobStream.get());
-  }
-
-  StringDictionaryColumnReader::~StringDictionaryColumnReader() {
-    // PASS
-  }
-
-  uint64_t StringDictionaryColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
-    rle->skip(numValues);
-    return numValues;
-  }
-
-  void StringDictionaryColumnReader::next(ColumnVectorBatch& rowBatch,
-                                          uint64_t numValues,
-                                          char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    // update the notNull from the parent class
-    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : 0;
-    StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
-    char *blob = dictionaryBlob.data();
-    int64_t *dictionaryOffsets = dictionaryOffset.data();
-    char **outputStarts = byteBatch.data.data();
-    int64_t *outputLengths = byteBatch.length.data();
-    rle->next(outputLengths, numValues, notNull);
-    if (notNull) {
-      for(uint64_t i=0; i < numValues; ++i) {
-        if (notNull[i]) {
-          int64_t entry = outputLengths[i];
-          outputStarts[i] = blob + dictionaryOffsets[entry];
-          outputLengths[i] = dictionaryOffsets[entry+1] -
-            dictionaryOffsets[entry];
-        }
-      }
-    } else {
-      for(uint64_t i=0; i < numValues; ++i) {
-        int64_t entry = outputLengths[i];
-        outputStarts[i] = blob + dictionaryOffsets[entry];
-        outputLengths[i] = dictionaryOffsets[entry+1] -
-          dictionaryOffsets[entry];
-      }
-    }
-  }
-
-  class StringDirectColumnReader: public ColumnReader {
-  private:
-    DataBuffer<char> blobBuffer;
-    std::unique_ptr<RleDecoder> lengthRle;
-    std::unique_ptr<SeekableInputStream> blobStream;
-    const char *lastBuffer;
-    size_t lastBufferLength;
-
-    /**
-     * Compute the total length of the values.
-     * @param lengths the array of lengths
-     * @param notNull the array of notNull flags
-     * @param numValues the lengths of the arrays
-     * @return the total number of bytes for the non-null values
-     */
-    size_t computeSize(const int64_t *lengths, const char *notNull,
-                       uint64_t numValues);
-
-  public:
-    StringDirectColumnReader(const Type& type, StripeStreams& stipe);
-    ~StringDirectColumnReader();
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char *notNull) override;
-  };
-
-  StringDirectColumnReader::StringDirectColumnReader
-                 (const Type& type,
-                  StripeStreams& stripe
-                  ): ColumnReader(type, stripe),
-                     blobBuffer(stripe.getMemoryPool()) {
-    RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId)
-                                                .kind());
-    lengthRle = createRleDecoder(stripe.getStream(columnId,
-                                                  proto::Stream_Kind_LENGTH,
-                                                  true),
-                                 false, rleVersion, memoryPool);
-    blobStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
-    lastBuffer = 0;
-    lastBufferLength = 0;
-  }
-
-  StringDirectColumnReader::~StringDirectColumnReader() {
-    // PASS
-  }
-
-  uint64_t StringDirectColumnReader::skip(uint64_t numValues) {
-    const size_t BUFFER_SIZE = 1024;
-    numValues = ColumnReader::skip(numValues);
-    int64_t buffer[BUFFER_SIZE];
-    uint64_t done = 0;
-    size_t totalBytes = 0;
-    // read the lengths, so we know haw many bytes to skip
-    while (done < numValues) {
-      uint64_t step = std::min(BUFFER_SIZE,
-                                    static_cast<size_t>(numValues - done));
-      lengthRle->next(buffer, step, 0);
-      totalBytes += computeSize(buffer, 0, step);
-      done += step;
-    }
-    if (totalBytes <= lastBufferLength) {
-      // subtract the needed bytes from the ones left over
-      lastBufferLength -= totalBytes;
-      lastBuffer += totalBytes;
-    } else {
-      // move the stream forward after accounting for the buffered bytes
-      totalBytes -= lastBufferLength;
-      blobStream->Skip(static_cast<int>(totalBytes));
-      lastBufferLength = 0;
-      lastBuffer = 0;
-    }
-    return numValues;
-  }
-
-  size_t StringDirectColumnReader::computeSize(const int64_t* lengths,
-                                               const char* notNull,
-                                               uint64_t numValues) {
-    size_t totalLength = 0;
-    if (notNull) {
-      for(size_t i=0; i < numValues; ++i) {
-        if (notNull[i]) {
-          totalLength += static_cast<size_t>(lengths[i]);
-        }
-      }
-    } else {
-      for(size_t i=0; i < numValues; ++i) {
-        totalLength += static_cast<size_t>(lengths[i]);
-      }
-    }
-    return totalLength;
-  }
-
-  void StringDirectColumnReader::next(ColumnVectorBatch& rowBatch,
-                                      uint64_t numValues,
-                                      char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    // update the notNull from the parent class
-    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : 0;
-    StringVectorBatch& byteBatch = dynamic_cast<StringVectorBatch&>(rowBatch);
-    char **startPtr = byteBatch.data.data();
-    int64_t *lengthPtr = byteBatch.length.data();
-
-    // read the length vector
-    lengthRle->next(lengthPtr, numValues, notNull);
-
-    // figure out the total length of data we need from the blob stream
-    const size_t totalLength = computeSize(lengthPtr, notNull, numValues);
-
-    // Load data from the blob stream into our buffer until we have enough
-    // to get the rest directly out of the stream's buffer.
-    size_t bytesBuffered = 0;
-    blobBuffer.resize(totalLength);
-    char *ptr= blobBuffer.data();
-    while (bytesBuffered + lastBufferLength < totalLength) {
-      blobBuffer.resize(bytesBuffered + lastBufferLength);
-      memcpy(ptr + bytesBuffered, lastBuffer, lastBufferLength);
-      bytesBuffered += lastBufferLength;
-      const void* readBuffer;
-      int readLength;
-      if (!blobStream->Next(&readBuffer, &readLength)) {
-        throw ParseError("failed to read in StringDirectColumnReader.next");
-      }
-      lastBuffer = static_cast<const char*>(readBuffer);
-      lastBufferLength = static_cast<size_t>(readLength);
-    }
-
-    // Set up the start pointers for the ones that will come out of the buffer.
-    size_t filledSlots = 0;
-    size_t usedBytes = 0;
-    ptr = blobBuffer.data();
-    if (notNull) {
-      while (filledSlots < numValues &&
-             (usedBytes + static_cast<size_t>(lengthPtr[filledSlots]) <=
-              bytesBuffered)) {
-        if (notNull[filledSlots]) {
-          startPtr[filledSlots] = ptr + usedBytes;
-          usedBytes += static_cast<size_t>(lengthPtr[filledSlots]);
-        }
-        filledSlots += 1;
-      }
-    } else {
-      while (filledSlots < numValues &&
-             (usedBytes + static_cast<size_t>(lengthPtr[filledSlots]) <=
-              bytesBuffered)) {
-        startPtr[filledSlots] = ptr + usedBytes;
-        usedBytes += static_cast<size_t>(lengthPtr[filledSlots]);
-        filledSlots += 1;
-      }
-    }
-
-    // do we need to complete the last value in the blob buffer?
-    if (usedBytes < bytesBuffered) {
-      size_t moreBytes = static_cast<size_t>(lengthPtr[filledSlots]) -
-        (bytesBuffered - usedBytes);
-      blobBuffer.resize(bytesBuffered + moreBytes);
-      ptr = blobBuffer.data();
-      memcpy(ptr + bytesBuffered, lastBuffer, moreBytes);
-      lastBuffer += moreBytes;
-      lastBufferLength -= moreBytes;
-      startPtr[filledSlots++] = ptr + usedBytes;
-    }
-
-    // Finally, set up any remaining entries into the stream buffer
-    if (notNull) {
-      while (filledSlots < numValues) {
-        if (notNull[filledSlots]) {
-          startPtr[filledSlots] = const_cast<char*>(lastBuffer);
-          lastBuffer += lengthPtr[filledSlots];
-          lastBufferLength -= static_cast<size_t>(lengthPtr[filledSlots]);
-        }
-        filledSlots += 1;
-      }
-    } else {
-      while (filledSlots < numValues) {
-        startPtr[filledSlots] = const_cast<char*>(lastBuffer);
-        lastBuffer += lengthPtr[filledSlots];
-        lastBufferLength -= static_cast<size_t>(lengthPtr[filledSlots]);
-        filledSlots += 1;
-      }
-    }
-  }
-
-  class StructColumnReader: public ColumnReader {
-  private:
-    std::vector<ColumnReader*> children;
-
-  public:
-    StructColumnReader(const Type& type, StripeStreams& stipe);
-    ~StructColumnReader();
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char *notNull) override;
-  };
-
-  StructColumnReader::StructColumnReader(const Type& type,
-                                         StripeStreams& stripe
-                                         ): ColumnReader(type, stripe) {
-    // count the number of selected sub-columns
-    const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
-    switch (static_cast<int64_t>(stripe.getEncoding(columnId).kind())) {
-    case proto::ColumnEncoding_Kind_DIRECT:
-      for(unsigned int i=0; i < type.getSubtypeCount(); ++i) {
-        const Type& child = type.getSubtype(i);
-        if (selectedColumns[static_cast<uint64_t>(child.getColumnId())]) {
-          children.push_back(buildReader(child, stripe).release());
-        }
-      }
-      break;
-    case proto::ColumnEncoding_Kind_DIRECT_V2:
-    case proto::ColumnEncoding_Kind_DICTIONARY:
-    case proto::ColumnEncoding_Kind_DICTIONARY_V2:
-    default:
-      throw ParseError("Unknown encoding for StructColumnReader");
-    }
-  }
-
-  StructColumnReader::~StructColumnReader() {
-    for (size_t i=0; i<children.size(); i++) {
-      delete children[i];
-    }
-  }
-
-  uint64_t StructColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
-    for(std::vector<ColumnReader*>::iterator ptr=children.begin(); ptr != children.end(); ++ptr) {
-      (*ptr)->skip(numValues);
-    }
-    return numValues;
-  }
-
-  void StructColumnReader::next(ColumnVectorBatch& rowBatch,
-                                uint64_t numValues,
-                                char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    uint64_t i=0;
-    notNull = rowBatch.hasNulls? rowBatch.notNull.data() : 0;
-    for(std::vector<ColumnReader*>::iterator ptr=children.begin();
-        ptr != children.end(); ++ptr, ++i) {
-      (*ptr)->next(*(dynamic_cast<StructVectorBatch&>(rowBatch).fields[i]),
-                   numValues, notNull);
-    }
-  }
-
-  class ListColumnReader: public ColumnReader {
-  private:
-    std::unique_ptr<ColumnReader> child;
-    std::unique_ptr<RleDecoder> rle;
-
-  public:
-    ListColumnReader(const Type& type, StripeStreams& stipe);
-    ~ListColumnReader();
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char *notNull) override;
-  };
-
-  ListColumnReader::ListColumnReader(const Type& type,
-                                     StripeStreams& stripe
-                                     ): ColumnReader(type, stripe) {
-    // count the number of selected sub-columns
-    const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
-    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
-    rle = createRleDecoder(stripe.getStream(columnId,
-                                            proto::Stream_Kind_LENGTH,
-                                            true),
-                           false, vers, memoryPool);
-    const Type& childType = type.getSubtype(0);
-    if (selectedColumns[static_cast<uint64_t>(childType.getColumnId())]) {
-      child = buildReader(childType, stripe);
-    }
-  }
-
-  ListColumnReader::~ListColumnReader() {
-    // PASS
-  }
-
-  uint64_t ListColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
-    ColumnReader *childReader = child.get();
-    if (childReader) {
-      const uint64_t BUFFER_SIZE = 1024;
-      int64_t buffer[BUFFER_SIZE];
-      uint64_t childrenElements = 0;
-      uint64_t lengthsRead = 0;
-      while (lengthsRead < numValues) {
-        uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
-        rle->next(buffer, chunk, 0);
-        for(size_t i=0; i < chunk; ++i) {
-          childrenElements += static_cast<size_t>(buffer[i]);
-        }
-        lengthsRead += chunk;
-      }
-      childReader->skip(childrenElements);
-    } else {
-      rle->skip(numValues);
-    }
-    return numValues;
-  }
-
-  void ListColumnReader::next(ColumnVectorBatch& rowBatch,
-                              uint64_t numValues,
-                              char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    ListVectorBatch &listBatch = dynamic_cast<ListVectorBatch&>(rowBatch);
-    int64_t* offsets = listBatch.offsets.data();
-    notNull = listBatch.hasNulls ? listBatch.notNull.data() : 0;
-    rle->next(offsets, numValues, notNull);
-    uint64_t totalChildren = 0;
-    if (notNull) {
-      for(size_t i=0; i < numValues; ++i) {
-        if (notNull[i]) {
-          uint64_t tmp = static_cast<uint64_t>(offsets[i]);
-          offsets[i] = static_cast<int64_t>(totalChildren);
-          totalChildren += tmp;
-        } else {
-          offsets[i] = static_cast<int64_t>(totalChildren);
-        }
-      }
-    } else {
-      for(size_t i=0; i < numValues; ++i) {
-        uint64_t tmp = static_cast<uint64_t>(offsets[i]);
-        offsets[i] = static_cast<int64_t>(totalChildren);
-        totalChildren += tmp;
-      }
-    }
-    offsets[numValues] = static_cast<int64_t>(totalChildren);
-    ColumnReader *childReader = child.get();
-    if (childReader) {
-      childReader->next(*(listBatch.elements.get()), totalChildren, 0);
-    }
-  }
-
-  class MapColumnReader: public ColumnReader {
-  private:
-    std::unique_ptr<ColumnReader> keyReader;
-    std::unique_ptr<ColumnReader> elementReader;
-    std::unique_ptr<RleDecoder> rle;
-
-  public:
-    MapColumnReader(const Type& type, StripeStreams& stipe);
-    ~MapColumnReader();
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char *notNull) override;
-  };
-
-  MapColumnReader::MapColumnReader(const Type& type,
-                                   StripeStreams& stripe
-                                   ): ColumnReader(type, stripe) {
-    // Determine if the key and/or value columns are selected
-    const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
-    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
-    rle = createRleDecoder(stripe.getStream(columnId,
-                                            proto::Stream_Kind_LENGTH,
-                                            true),
-                           false, vers, memoryPool);
-    const Type& keyType = type.getSubtype(0);
-    if (selectedColumns[static_cast<uint64_t>(keyType.getColumnId())]) {
-      keyReader = buildReader(keyType, stripe);
-    }
-    const Type& elementType = type.getSubtype(1);
-    if (selectedColumns[static_cast<uint64_t>(elementType.getColumnId())]) {
-      elementReader = buildReader(elementType, stripe);
-    }
-  }
-
-  MapColumnReader::~MapColumnReader() {
-    // PASS
-  }
-
-  uint64_t MapColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
-    ColumnReader *rawKeyReader = keyReader.get();
-    ColumnReader *rawElementReader = elementReader.get();
-    if (rawKeyReader || rawElementReader) {
-      const uint64_t BUFFER_SIZE = 1024;
-      int64_t buffer[BUFFER_SIZE];
-      uint64_t childrenElements = 0;
-      uint64_t lengthsRead = 0;
-      while (lengthsRead < numValues) {
-        uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
-        rle->next(buffer, chunk, 0);
-        for(size_t i=0; i < chunk; ++i) {
-          childrenElements += static_cast<size_t>(buffer[i]);
-        }
-        lengthsRead += chunk;
-      }
-      if (rawKeyReader) {
-        rawKeyReader->skip(childrenElements);
-      }
-      if (rawElementReader) {
-        rawElementReader->skip(childrenElements);
-      }
-    } else {
-      rle->skip(numValues);
-    }
-    return numValues;
-  }
-
-  void MapColumnReader::next(ColumnVectorBatch& rowBatch,
-                             uint64_t numValues,
-                             char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    MapVectorBatch &mapBatch = dynamic_cast<MapVectorBatch&>(rowBatch);
-    int64_t* offsets = mapBatch.offsets.data();
-    notNull = mapBatch.hasNulls ? mapBatch.notNull.data() : 0;
-    rle->next(offsets, numValues, notNull);
-    uint64_t totalChildren = 0;
-    if (notNull) {
-      for(size_t i=0; i < numValues; ++i) {
-        if (notNull[i]) {
-          uint64_t tmp = static_cast<uint64_t>(offsets[i]);
-          offsets[i] = static_cast<int64_t>(totalChildren);
-          totalChildren += tmp;
-        } else {
-          offsets[i] = static_cast<int64_t>(totalChildren);
-        }
-      }
-    } else {
-      for(size_t i=0; i < numValues; ++i) {
-        uint64_t tmp = static_cast<uint64_t>(offsets[i]);
-        offsets[i] = static_cast<int64_t>(totalChildren);
-        totalChildren += tmp;
-      }
-    }
-    offsets[numValues] = static_cast<int64_t>(totalChildren);
-    ColumnReader *rawKeyReader = keyReader.get();
-    if (rawKeyReader) {
-      rawKeyReader->next(*(mapBatch.keys.get()), totalChildren, 0);
-    }
-    ColumnReader *rawElementReader = elementReader.get();
-    if (rawElementReader) {
-      rawElementReader->next(*(mapBatch.elements.get()), totalChildren, 0);
-    }
-  }
-
-  class UnionColumnReader: public ColumnReader {
-  private:
-    std::unique_ptr<ByteRleDecoder> rle;
-    std::vector<ColumnReader*> childrenReader;
-    std::vector<int64_t> childrenCounts;
-    uint64_t numChildren;
-
-  public:
-    UnionColumnReader(const Type& type, StripeStreams& stipe);
-    ~UnionColumnReader();
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char *notNull) override;
-  };
-
-  UnionColumnReader::UnionColumnReader(const Type& type,
-                                       StripeStreams& stripe
-                                       ): ColumnReader(type, stripe) {
-    numChildren = type.getSubtypeCount();
-    childrenReader.resize(numChildren);
-    childrenCounts.resize(numChildren);
-
-    rle = createByteRleDecoder(stripe.getStream(columnId,
-                                                proto::Stream_Kind_DATA,
-                                                true));
-    // figure out which types are selected
-    const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
-    for(unsigned int i=0; i < numChildren; ++i) {
-      const Type &child = type.getSubtype(i);
-      if (selectedColumns[static_cast<size_t>(child.getColumnId())]) {
-        childrenReader[i] = buildReader(child, stripe).release();
-      }
-    }
-  }
-
-  UnionColumnReader::~UnionColumnReader() {
-    for(std::vector<ColumnReader*>::iterator itr = childrenReader.begin();
-        itr != childrenReader.end(); ++itr) {
-      delete *itr;
-    }
-  }
-
-  uint64_t UnionColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
-    const uint64_t BUFFER_SIZE = 1024;
-    char buffer[BUFFER_SIZE];
-    uint64_t lengthsRead = 0;
-    int64_t *counts = childrenCounts.data();
-    memset(counts, 0, sizeof(int64_t) * numChildren);
-    while (lengthsRead < numValues) {
-      uint64_t chunk = std::min(numValues - lengthsRead, BUFFER_SIZE);
-      rle->next(buffer, chunk, 0);
-      for(size_t i=0; i < chunk; ++i) {
-        counts[static_cast<size_t>(buffer[i])] += 1;
-      }
-      lengthsRead += chunk;
-    }
-    for(size_t i=0; i < numChildren; ++i) {
-      if (counts[i] != 0 && childrenReader[i] != NULL) {
-        childrenReader[i]->skip(static_cast<uint64_t>(counts[i]));
-      }
-    }
-    return numValues;
-  }
-
-  void UnionColumnReader::next(ColumnVectorBatch& rowBatch,
-                               uint64_t numValues,
-                               char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    UnionVectorBatch &unionBatch = dynamic_cast<UnionVectorBatch&>(rowBatch);
-    uint64_t* offsets = unionBatch.offsets.data();
-    int64_t* counts = childrenCounts.data();
-    memset(counts, 0, sizeof(int64_t) * numChildren);
-    unsigned char* tags = unionBatch.tags.data();
-    notNull = unionBatch.hasNulls ? unionBatch.notNull.data() : 0;
-    rle->next(reinterpret_cast<char *>(tags), numValues, notNull);
-    // set the offsets for each row
-    if (notNull) {
-      for(size_t i=0; i < numValues; ++i) {
-        if (notNull[i]) {
-          offsets[i] =
-            static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
-        }
-      }
-    } else {
-      for(size_t i=0; i < numValues; ++i) {
-        offsets[i] =
-          static_cast<uint64_t>(counts[static_cast<size_t>(tags[i])]++);
-      }
-    }
-    // read the right number of each child column
-    for(size_t i=0; i < numChildren; ++i) {
-      if (childrenReader[i] != nullptr) {
-        childrenReader[i]->next(*(unionBatch.children[i]),
-                                static_cast<uint64_t>(counts[i]), nullptr);
-      }
-    }
-  }
-
-  /**
-   * Destructively convert the number from zigzag encoding to the
-   * natural signed representation.
-   */
-  void unZigZagInt128(Int128& value) {
-    bool needsNegate = value.getLowBits() & 1;
-    value >>= 1;
-    if (needsNegate) {
-      value.negate();
-      value -= 1;
-    }
-  }
-
-  class Decimal64ColumnReader: public ColumnReader {
-  public:
-    static const uint32_t MAX_PRECISION_64 = 18;
-    static const uint32_t MAX_PRECISION_128 = 38;
-    static const int64_t POWERS_OF_TEN[MAX_PRECISION_64 + 1];
-
-  protected:
-    std::unique_ptr<SeekableInputStream> valueStream;
-    int32_t precision;
-    int32_t scale;
-    const char* buffer;
-    const char* bufferEnd;
-
-    std::unique_ptr<RleDecoder> scaleDecoder;
-
-    /**
-     * Read the valueStream for more bytes.
-     */
-    void readBuffer() {
-      while (buffer == bufferEnd) {
-        int length;
-        if (!valueStream->Next(reinterpret_cast<const void**>(&buffer),
-                               &length)) {
-          throw ParseError("Read past end of stream in Decimal64ColumnReader "+
-                           valueStream->getName());
-        }
-        bufferEnd = buffer + length;
-      }
-    }
-
-    void readInt64(int64_t& value, int32_t currentScale) {
-      value = 0;
-      size_t offset = 0;
-      while (true) {
-        readBuffer();
-        unsigned char ch = static_cast<unsigned char>(*(buffer++));
-        value |= static_cast<uint64_t>(ch & 0x7f) << offset;
-        offset += 7;
-        if (!(ch & 0x80)) {
-          break;
-        }
-      }
-      value = unZigZag(static_cast<uint64_t>(value));
-      if (scale > currentScale) {
-        value *= POWERS_OF_TEN[scale - currentScale];
-      } else if (scale < currentScale) {
-        value /= POWERS_OF_TEN[currentScale - scale];
-      }
-    }
-
-  public:
-    Decimal64ColumnReader(const Type& type, StripeStreams& stipe);
-    ~Decimal64ColumnReader();
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char *notNull) override;
-  };
-  const uint32_t Decimal64ColumnReader::MAX_PRECISION_64;
-  const uint32_t Decimal64ColumnReader::MAX_PRECISION_128;
-  const int64_t Decimal64ColumnReader::POWERS_OF_TEN[MAX_PRECISION_64 + 1]=
-    {1,
-     10,
-     100,
-     1000,
-     10000,
-     100000,
-     1000000,
-     10000000,
-     100000000,
-     1000000000,
-     10000000000,
-     100000000000,
-     1000000000000,
-     10000000000000,
-     100000000000000,
-     1000000000000000,
-     10000000000000000,
-     100000000000000000,
-     1000000000000000000};
-
-  Decimal64ColumnReader::Decimal64ColumnReader(const Type& type,
-                                               StripeStreams& stripe
-                                               ): ColumnReader(type, stripe) {
-    scale = static_cast<int32_t>(type.getScale());
-    precision = static_cast<int32_t>(type.getPrecision());
-    valueStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
-    buffer = nullptr;
-    bufferEnd = nullptr;
-    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
-    scaleDecoder = createRleDecoder(stripe.getStream
-                                    (columnId,
-                                     proto::Stream_Kind_SECONDARY,
-                                     true),
-                                    true, vers, memoryPool);
-  }
-
-  Decimal64ColumnReader::~Decimal64ColumnReader() {
-    // PASS
-  }
-
-  uint64_t Decimal64ColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
-    uint64_t skipped = 0;
-    while (skipped < numValues) {
-      readBuffer();
-      if (!(0x80 & *(buffer++))) {
-        skipped += 1;
-      }
-    }
-    scaleDecoder->skip(numValues);
-    return numValues;
-  }
-
-  void Decimal64ColumnReader::next(ColumnVectorBatch& rowBatch,
-                                   uint64_t numValues,
-                                   char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : 0;
-    Decimal64VectorBatch &batch =
-      dynamic_cast<Decimal64VectorBatch&>(rowBatch);
-    int64_t* values = batch.values.data();
-    // read the next group of scales
-    int64_t* scaleBuffer = batch.readScales.data();
-    scaleDecoder->next(scaleBuffer, numValues, notNull);
-    batch.precision = precision;
-    batch.scale = scale;
-    if (notNull) {
-      for(size_t i=0; i < numValues; ++i) {
-        if (notNull[i]) {
-          readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
-        }
-      }
-    } else {
-      for(size_t i=0; i < numValues; ++i) {
-        readInt64(values[i], static_cast<int32_t>(scaleBuffer[i]));
-      }
-    }
-  }
-
-  void scaleInt128(Int128& value, uint32_t scale, uint32_t currentScale) {
-    if (scale > currentScale) {
-      while(scale > currentScale) {
-        uint32_t scaleAdjust =
-          std::min(Decimal64ColumnReader::MAX_PRECISION_64,
-                   scale - currentScale);
-        value *= Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust];
-        currentScale += scaleAdjust;
-      }
-    } else if (scale < currentScale) {
-      Int128 remainder;
-      while(currentScale > scale) {
-        uint32_t scaleAdjust =
-          std::min(Decimal64ColumnReader::MAX_PRECISION_64,
-                   currentScale - scale);
-        value = value.divide(Decimal64ColumnReader::POWERS_OF_TEN[scaleAdjust],
-                             remainder);
-        currentScale -= scaleAdjust;
-      }
-    }
-  }
-
-  class Decimal128ColumnReader: public Decimal64ColumnReader {
-  public:
-    Decimal128ColumnReader(const Type& type, StripeStreams& stipe);
-    ~Decimal128ColumnReader();
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char *notNull) override;
-
-  private:
-    void readInt128(Int128& value, int32_t currentScale) {
-      value = 0;
-      Int128 work;
-      uint32_t offset = 0;
-      while (true) {
-        readBuffer();
-        unsigned char ch = static_cast<unsigned char>(*(buffer++));
-        work = ch & 0x7f;
-        work <<= offset;
-        value |=  work;
-        offset += 7;
-        if (!(ch & 0x80)) {
-          break;
-        }
-      }
-      unZigZagInt128(value);
-      scaleInt128(value, static_cast<uint32_t>(scale), 
-                  static_cast<uint32_t>(currentScale));
-    }
-  };
-
-  Decimal128ColumnReader::Decimal128ColumnReader
-                (const Type& type,
-                 StripeStreams& stripe
-                 ): Decimal64ColumnReader(type, stripe) {
-    // PASS
-  }
-
-  Decimal128ColumnReader::~Decimal128ColumnReader() {
-    // PASS
-  }
-
-  void Decimal128ColumnReader::next(ColumnVectorBatch& rowBatch,
-                                   uint64_t numValues,
-                                   char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : 0;
-    Decimal128VectorBatch &batch =
-      dynamic_cast<Decimal128VectorBatch&>(rowBatch);
-    Int128* values = batch.values.data();
-    // read the next group of scales
-    int64_t* scaleBuffer = batch.readScales.data();
-    scaleDecoder->next(scaleBuffer, numValues, notNull);
-    batch.precision = precision;
-    batch.scale = scale;
-    if (notNull) {
-      for(size_t i=0; i < numValues; ++i) {
-        if (notNull[i]) {
-          readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
-        }
-      }
-    } else {
-      for(size_t i=0; i < numValues; ++i) {
-        readInt128(values[i], static_cast<int32_t>(scaleBuffer[i]));
-      }
-    }
-  }
-
-  class DecimalHive11ColumnReader: public Decimal64ColumnReader {
-  private:
-    bool throwOnOverflow;
-    std::ostream* errorStream;
-
-    /**
-     * Read an Int128 from the stream and correct it to the desired scale.
-     */
-    bool readInt128(Int128& value, int32_t currentScale) {
-      // -/+ 99999999999999999999999999999999999999
-      static const Int128 MIN_VALUE(-0x4b3b4ca85a86c47b, 0xf675ddc000000001);
-      static const Int128 MAX_VALUE( 0x4b3b4ca85a86c47a, 0x098a223fffffffff);
-
-      value = 0;
-      Int128 work;
-      uint32_t offset = 0;
-      bool result = true;
-      while (true) {
-        readBuffer();
-        unsigned char ch = static_cast<unsigned char>(*(buffer++));
-        work = ch & 0x7f;
-        // If we have read more than 128 bits, we flag the error, but keep
-        // reading bytes so the stream isn't thrown off.
-        if (offset > 128 || (offset == 126 && work > 3)) {
-          result = false;
-        }
-        work <<= offset;
-        value |=  work;
-        offset += 7;
-        if (!(ch & 0x80)) {
-          break;
-        }
-      }
-
-      if (!result) {
-        return result;
-      }
-      unZigZagInt128(value);
-      scaleInt128(value, static_cast<uint32_t>(scale),
-                  static_cast<uint32_t>(currentScale));
-      return value >= MIN_VALUE && value <= MAX_VALUE;
-    }
-
-  public:
-    DecimalHive11ColumnReader(const Type& type, StripeStreams& stipe);
-    ~DecimalHive11ColumnReader();
-
-    void next(ColumnVectorBatch& rowBatch,
-              uint64_t numValues,
-              char *notNull) override;
-  };
-
-  DecimalHive11ColumnReader::DecimalHive11ColumnReader
-                    (const Type& type,
-                     StripeStreams& stripe
-                     ): Decimal64ColumnReader(type, stripe) {
-    const ReaderOptions options = stripe.getReaderOptions();
-    scale = options.getForcedScaleOnHive11Decimal();
-    throwOnOverflow = options.getThrowOnHive11DecimalOverflow();
-    errorStream = options.getErrorStream();
-  }
-
-  DecimalHive11ColumnReader::~DecimalHive11ColumnReader() {
-    // PASS
-  }
-
-  void DecimalHive11ColumnReader::next(ColumnVectorBatch& rowBatch,
-                                       uint64_t numValues,
-                                       char *notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : 0;
-    Decimal128VectorBatch &batch =
-      dynamic_cast<Decimal128VectorBatch&>(rowBatch);
-    Int128* values = batch.values.data();
-    // read the next group of scales
-    int64_t* scaleBuffer = batch.readScales.data();
-
-    scaleDecoder->next(scaleBuffer, numValues, notNull);
-
-    batch.precision = precision;
-    batch.scale = scale;
-    if (notNull) {
-      for(size_t i=0; i < numValues; ++i) {
-        if (notNull[i]) {
-          if (!readInt128(values[i],
-                          static_cast<int32_t>(scaleBuffer[i]))) {
-            if (throwOnOverflow) {
-              throw ParseError("Hive 0.11 decimal was more than 38 digits.");
-            } else {
-              *errorStream << "Warning: "
-                           << "Hive 0.11 decimal with more than 38 digits "
-                           << "replaced by NULL.\n";
-              notNull[i] = false;
-            }
-          }
-        }
-      }
-    } else {
-      for(size_t i=0; i < numValues; ++i) {
-        if (!readInt128(values[i],
-                        static_cast<int32_t>(scaleBuffer[i]))) {
-          if (throwOnOverflow) {
-            throw ParseError("Hive 0.11 decimal was more than 38 digits.");
-          } else {
-            *errorStream << "Warning: "
-                         << "Hive 0.11 decimal with more than 38 digits "
-                         << "replaced by NULL.\n";
-            batch.hasNulls = true;
-            batch.notNull[i] = false;
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Create a reader for the given stripe.
-   */
-  std::unique_ptr<ColumnReader> buildReader(const Type& type,
-                                            StripeStreams& stripe) {
-    switch (static_cast<int64_t>(type.getKind())) {
-    case DATE:
-    case INT:
-    case LONG:
-    case SHORT:
-      return std::unique_ptr<ColumnReader>(
-          new IntegerColumnReader(type, stripe));
-    case BINARY:
-    case CHAR:
-    case STRING:
-    case VARCHAR:
-      switch (static_cast<int64_t>(stripe.getEncoding(type.getColumnId()).kind())){
-      case proto::ColumnEncoding_Kind_DICTIONARY:
-      case proto::ColumnEncoding_Kind_DICTIONARY_V2:
-        return std::unique_ptr<ColumnReader>(
-            new StringDictionaryColumnReader(type, stripe));
-      case proto::ColumnEncoding_Kind_DIRECT:
-      case proto::ColumnEncoding_Kind_DIRECT_V2:
-        return std::unique_ptr<ColumnReader>(
-            new StringDirectColumnReader(type, stripe));
-      default:
-        throw NotImplementedYet("buildReader unhandled string encoding");
-      }
-
-    case BOOLEAN:
-      return std::unique_ptr<ColumnReader>(
-          new BooleanColumnReader(type, stripe));
-
-    case BYTE:
-      return std::unique_ptr<ColumnReader>(
-          new ByteColumnReader(type, stripe));
-
-    case LIST:
-      return std::unique_ptr<ColumnReader>(
-          new ListColumnReader(type, stripe));
-
-    case MAP:
-      return std::unique_ptr<ColumnReader>(
-          new MapColumnReader(type, stripe));
-
-    case UNION:
-      return std::unique_ptr<ColumnReader>(
-          new UnionColumnReader(type, stripe));
-
-    case STRUCT:
-      return std::unique_ptr<ColumnReader>(
-          new StructColumnReader(type, stripe));
-
-    case FLOAT:
-    case DOUBLE:
-      return std::unique_ptr<ColumnReader>(
-          new DoubleColumnReader(type, stripe));
-
-    case TIMESTAMP:
-      return std::unique_ptr<ColumnReader>
-        (new TimestampColumnReader(type, stripe));
-
-    case DECIMAL:
-      // is this a Hive 0.11 or 0.12 file?
-      if (type.getPrecision() == 0) {
-        return std::unique_ptr<ColumnReader>
-          (new DecimalHive11ColumnReader(type, stripe));
-
-      // can we represent the values using int64_t?
-      } else if (type.getPrecision() <=
-                 Decimal64ColumnReader::MAX_PRECISION_64) {
-        return std::unique_ptr<ColumnReader>
-          (new Decimal64ColumnReader(type, stripe));
-
-      // otherwise we use the Int128 implementation
-      } else {
-        return std::unique_ptr<ColumnReader>
-          (new Decimal128ColumnReader(type, stripe));
-      }
-
-    default:
-      throw NotImplementedYet("buildReader unhandled type");
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/7f55b453/c++/src/orc/ColumnReader.hh
----------------------------------------------------------------------
diff --git a/c++/src/orc/ColumnReader.hh b/c++/src/orc/ColumnReader.hh
deleted file mode 100644
index b90c942..0000000
--- a/c++/src/orc/ColumnReader.hh
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef ORC_COLUMN_READER_HH
-#define ORC_COLUMN_READER_HH
-
-#include "orc/Vector.hh"
-#include "ByteRLE.hh"
-#include "Compression.hh"
-#include "wrap/orc-proto-wrapper.hh"
-
-namespace orc {
-
-  class StripeStreams {
-  public:
-    virtual ~StripeStreams();
-
-    /**
-     * Get the reader options.
-     */
-    virtual const ReaderOptions& getReaderOptions() const = 0;
-
-    /**
-     * Get the array of booleans for which columns are selected.
-     * @return the address of an array which contains true at the index of
-     *    each columnId is selected.
-     */
-    virtual const std::vector<bool> getSelectedColumns() const = 0;
-
-    /**
-     * Get the encoding for the given column for this stripe.
-     */
-    virtual proto::ColumnEncoding getEncoding(int64_t columnId) const = 0;
-
-    /**
-     * Get the stream for the given column/kind in this stripe.
-     * @param columnId the id of the column
-     * @param kind the kind of the stream
-     * @param shouldStream should the reading page the stream in
-     * @return the new stream
-     */
-    virtual std::unique_ptr<SeekableInputStream>
-                    getStream(int64_t columnId,
-                              proto::Stream_Kind kind,
-                              bool shouldStream) const = 0;
-
-    /**
-     * Get the memory pool for this reader.
-     */
-    virtual MemoryPool& getMemoryPool() const = 0;
-  };
-
-  /**
-   * The interface for reading ORC data types.
-   */
-  class ColumnReader {
-  protected:
-    std::unique_ptr<ByteRleDecoder> notNullDecoder;
-    int64_t columnId;
-    MemoryPool& memoryPool;
-
-  public:
-    ColumnReader(const Type& type, StripeStreams& stipe);
-
-    virtual ~ColumnReader();
-
-    /**
-     * Skip number of specified rows.
-     * @param numValues the number of values to skip
-     * @return the number of non-null values skipped
-     */
-    virtual uint64_t skip(uint64_t numValues);
-
-    /**
-     * Read the next group of values into this rowBatch.
-     * @param rowBatch the memory to read into.
-     * @param numValues the number of values to read
-     * @param notNull if null, all values are not null. Otherwise, it is
-     *           a mask (with at least numValues bytes) for which values to
-     *           set.
-     */
-    virtual void next(ColumnVectorBatch& rowBatch,
-                      uint64_t numValues,
-                      char* notNull);
-  };
-
-  /**
-   * Create a reader for the given stripe.
-   */
-  std::unique_ptr<ColumnReader> buildReader(const Type& type,
-                                            StripeStreams& stripe);
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/orc/blob/7f55b453/c++/src/orc/Compression.cc
----------------------------------------------------------------------
diff --git a/c++/src/orc/Compression.cc b/c++/src/orc/Compression.cc
deleted file mode 100644
index 06e10e0..0000000
--- a/c++/src/orc/Compression.cc
+++ /dev/null
@@ -1,751 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "orc/Adaptor.hh"
-#include "Compression.hh"
-#include "Exceptions.hh"
-
-#include <algorithm>
-#include <iomanip>
-#include <iostream>
-#include <sstream>
-
-#include "zlib.h"
-
-#include "wrap/snappy-wrapper.h"
-
-namespace orc {
-
-  void printBuffer(std::ostream& out,
-                   const char *buffer,
-                   uint64_t length) {
-    const uint64_t width = 24;
-    out << std::hex;
-    for(uint64_t line = 0; line < (length + width - 1) / width; ++line) {
-      out << std::setfill('0') << std::setw(7) << (line * width);
-      for(uint64_t byte = 0;
-          byte < width && line * width + byte < length; ++byte) {
-        out << " " << std::setfill('0') << std::setw(2)
-                  << static_cast<uint64_t>(0xff & buffer[line * width +
-                                                             byte]);
-      }
-      out << "\n";
-    }
-    out << std::dec;
-  }
-
-  PositionProvider::PositionProvider(const std::list<uint64_t>& posns) {
-    position = posns.begin();
-  }
-
-  uint64_t PositionProvider::next() {
-    uint64_t result = *position;
-    ++position;
-    return result;
-  }
-
-  SeekableInputStream::~SeekableInputStream() {
-    // PASS
-  }
-
-  SeekableArrayInputStream::~SeekableArrayInputStream() {
-    // PASS
-  }
-
-  SeekableArrayInputStream::SeekableArrayInputStream
-               (const unsigned char* values,
-                uint64_t size,
-                int64_t blkSize
-                ): data(reinterpret_cast<const char*>(values)) {
-    length = size;
-    position = 0;
-    blockSize = blkSize == -1 ? length : static_cast<uint64_t>(blkSize);
-  }
-
-  SeekableArrayInputStream::SeekableArrayInputStream(const char* values,
-                                                     uint64_t size,
-                                                     int64_t blkSize
-                                                     ): data(values) {
-    length = size;
-    position = 0;
-    blockSize = blkSize == -1 ? length : static_cast<uint64_t>(blkSize);
-  }
-
-  bool SeekableArrayInputStream::Next(const void** buffer, int*size) {
-    uint64_t currentSize = std::min(length - position, blockSize);
-    if (currentSize > 0) {
-      *buffer = data + position;
-      *size = static_cast<int>(currentSize);
-      position += currentSize;
-      return true;
-    }
-    *size = 0;
-    return false;
-  }
-
-  void SeekableArrayInputStream::BackUp(int count) {
-    if (count >= 0) {
-      uint64_t unsignedCount = static_cast<uint64_t>(count);
-      if (unsignedCount <= blockSize && unsignedCount <= position) {
-        position -= unsignedCount;
-      } else {
-        throw std::logic_error("Can't backup that much!");
-      }
-    }
-  }
-
-  bool SeekableArrayInputStream::Skip(int count) {
-    if (count >= 0) {
-      uint64_t unsignedCount = static_cast<uint64_t>(count);
-      if (unsignedCount + position <= length) {
-        position += unsignedCount;
-        return true;
-      } else {
-        position = length;
-      }
-    }
-    return false;
-  }
-
-  google::protobuf::int64 SeekableArrayInputStream::ByteCount() const {
-    return static_cast<google::protobuf::int64>(position);
-  }
-
-  void SeekableArrayInputStream::seek(PositionProvider& seekPosition) {
-    position = seekPosition.next();
-  }
-
-  std::string SeekableArrayInputStream::getName() const {
-    std::ostringstream result;
-    result << "SeekableArrayInputStream " << position << " of " << length;
-    return result.str();
-  }
-
-  static uint64_t computeBlock(int64_t request, uint64_t length) {
-    return std::min(length,
-                    static_cast<uint64_t>(request < 0 ?
-                                          256 * 1024 : request));
-  }
-
-  SeekableFileInputStream::SeekableFileInputStream(InputStream* stream,
-                                                   uint64_t offset,
-                                                   uint64_t byteCount,
-                                                   MemoryPool& _pool,
-                                                   int64_t _blockSize
-                                                   ):pool(_pool),
-                                                     input(stream),
-                                                     start(offset),
-                                                     length(byteCount),
-                                                     blockSize(computeBlock
-                                                               (_blockSize,
-                                                                length)) {
-
-    position = 0;
-    buffer.reset(new DataBuffer<char>(pool));
-    pushBack = 0;
-  }
-
-  SeekableFileInputStream::~SeekableFileInputStream() {
-    // PASS
-  }
-
-  bool SeekableFileInputStream::Next(const void** data, int*size) {
-    uint64_t bytesRead;
-    if (pushBack != 0) {
-      *data = buffer->data() + (buffer->size() - pushBack);
-      bytesRead = pushBack;
-    } else {
-      bytesRead = std::min(length - position, blockSize);
-      buffer->resize(bytesRead);
-      if (bytesRead > 0) {
-        input->read(buffer->data(), bytesRead, start+position);
-        *data = static_cast<void*>(buffer->data());
-      }
-    }
-    position += bytesRead;
-    pushBack = 0;
-    *size = static_cast<int>(bytesRead);
-    return bytesRead != 0;
-  }
-
-  void SeekableFileInputStream::BackUp(int signedCount) {
-    if (signedCount < 0) {
-      throw std::logic_error("can't backup negative distances");
-    }
-    uint64_t count = static_cast<uint64_t>(signedCount);
-    if (pushBack > 0) {
-      throw std::logic_error("can't backup unless we just called Next");
-    }
-    if (count > blockSize || count > position) {
-      throw std::logic_error("can't backup that far");
-    }
-    pushBack = static_cast<uint64_t>(count);
-    position -= pushBack;
-  }
-
-  bool SeekableFileInputStream::Skip(int signedCount) {
-    if (signedCount < 0) {
-      return false;
-    }
-    uint64_t count = static_cast<uint64_t>(signedCount);
-    position = std::min(position + count, length);
-    pushBack = 0;
-    return position < length;
-  }
-
-  int64_t SeekableFileInputStream::ByteCount() const {
-    return static_cast<int64_t>(position);
-  }
-
-  void SeekableFileInputStream::seek(PositionProvider& location) {
-    position = location.next();
-    if (position > length) {
-      position = length;
-      throw std::logic_error("seek too far");
-    }
-    pushBack = 0;
-  }
-
-  std::string SeekableFileInputStream::getName() const {
-    std::ostringstream result;
-    result << input->getName() << " from " << start << " for "
-           << length;
-    return result.str();
-  }
-
-  enum DecompressState { DECOMPRESS_HEADER,
-                         DECOMPRESS_START,
-                         DECOMPRESS_CONTINUE,
-                         DECOMPRESS_ORIGINAL,
-                         DECOMPRESS_EOF};
-
-  class ZlibDecompressionStream: public SeekableInputStream {
-  public:
-    ZlibDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
-                            size_t blockSize,
-                            MemoryPool& pool);
-    virtual ~ZlibDecompressionStream();
-    virtual bool Next(const void** data, int*size) override;
-    virtual void BackUp(int count) override;
-    virtual bool Skip(int count) override;
-    virtual int64_t ByteCount() const override;
-    virtual void seek(PositionProvider& position) override;
-    virtual std::string getName() const override;
-
-  private:
-    void readBuffer(bool failOnEof) {
-      int length;
-      if (!input->Next(reinterpret_cast<const void**>(&inputBuffer),
-                       &length)) {
-        if (failOnEof) {
-          throw ParseError("Read past EOF in "
-                           "ZlibDecompressionStream::readBuffer");
-        }
-        state = DECOMPRESS_EOF;
-        inputBuffer = nullptr;
-        inputBufferEnd = nullptr;
-      } else {
-        inputBufferEnd = inputBuffer + length;
-      }
-    }
-
-    uint32_t readByte(bool failOnEof) {
-      if (inputBuffer == inputBufferEnd) {
-        readBuffer(failOnEof);
-        if (state == DECOMPRESS_EOF) {
-          return 0;
-        }
-      }
-      return static_cast<unsigned char>(*(inputBuffer++));
-    }
-
-    void readHeader() {
-      uint32_t header = readByte(false);
-      if (state != DECOMPRESS_EOF) {
-        header |= readByte(true) << 8;
-        header |= readByte(true) << 16;
-        if (header & 1) {
-          state = DECOMPRESS_ORIGINAL;
-        } else {
-          state = DECOMPRESS_START;
-        }
-        remainingLength = header >> 1;
-      } else {
-        remainingLength = 0;
-      }
-    }
-
-    MemoryPool& pool;
-    const size_t blockSize;
-    std::unique_ptr<SeekableInputStream> input;
-    z_stream zstream;
-    DataBuffer<char> buffer;
-
-    // the current state
-    DecompressState state;
-
-    // the start of the current buffer
-    // This pointer is not owned by us. It is either owned by zstream or
-    // the underlying stream.
-    const char* outputBuffer;
-    // the size of the current buffer
-    size_t outputBufferLength;
-    // the size of the current chunk
-    size_t remainingLength;
-
-    // the last buffer returned from the input
-    const char *inputBuffer;
-    const char *inputBufferEnd;
-
-    // roughly the number of bytes returned
-    off_t bytesReturned;
-  };
-
-DIAGNOSTIC_PUSH
-DIAGNOSTIC_IGNORE("-Wold-style-cast")
-
-  ZlibDecompressionStream::ZlibDecompressionStream
-                   (std::unique_ptr<SeekableInputStream> inStream,
-                    size_t _blockSize,
-                    MemoryPool& _pool
-                    ): pool(_pool),
-                       blockSize(_blockSize),
-                       buffer(pool, _blockSize) {
-    input.reset(inStream.release());
-    zstream.next_in = Z_NULL;
-    zstream.avail_in = 0;
-    zstream.zalloc = Z_NULL;
-    zstream.zfree = Z_NULL;
-    zstream.opaque = Z_NULL;
-    zstream.next_out = reinterpret_cast<Bytef*>(buffer.data());
-    zstream.avail_out = static_cast<uInt>(blockSize);
-    int64_t result = inflateInit2(&zstream, -15);
-    switch (result) {
-    case Z_OK:
-      break;
-    case Z_MEM_ERROR:
-      throw std::logic_error("Memory error from inflateInit2");
-    case Z_VERSION_ERROR:
-      throw std::logic_error("Version error from inflateInit2");
-    case Z_STREAM_ERROR:
-      throw std::logic_error("Stream error from inflateInit2");
-    default:
-      throw std::logic_error("Unknown error from inflateInit2");
-    }
-    outputBuffer = nullptr;
-    outputBufferLength = 0;
-    remainingLength = 0;
-    state = DECOMPRESS_HEADER;
-    inputBuffer = nullptr;
-    inputBufferEnd = nullptr;
-    bytesReturned = 0;
-  }
-
-DIAGNOSTIC_POP
-
-  ZlibDecompressionStream::~ZlibDecompressionStream() {
-    int64_t result = inflateEnd(&zstream);
-    if (result != Z_OK) {
-      // really can't throw in destructors
-      std::cout << "Error in ~ZlibDecompressionStream() " << result << "\n";
-    }
-  }
-
-  bool ZlibDecompressionStream::Next(const void** data, int*size) {
-    // if the user pushed back, return them the partial buffer
-    if (outputBufferLength) {
-      *data = outputBuffer;
-      *size = static_cast<int>(outputBufferLength);
-      outputBuffer += outputBufferLength;
-      outputBufferLength = 0;
-      return true;
-    }
-    if (state == DECOMPRESS_HEADER || remainingLength == 0) {
-      readHeader();
-    }
-    if (state == DECOMPRESS_EOF) {
-      return false;
-    }
-    if (inputBuffer == inputBufferEnd) {
-      readBuffer(true);
-    }
-    size_t availSize =
-      std::min(static_cast<size_t>(inputBufferEnd - inputBuffer),
-               remainingLength);
-    if (state == DECOMPRESS_ORIGINAL) {
-      *data = inputBuffer;
-      *size = static_cast<int>(availSize);
-      outputBuffer = inputBuffer + availSize;
-      outputBufferLength = 0;
-    } else if (state == DECOMPRESS_START) {
-      zstream.next_in =
-        reinterpret_cast<Bytef*>(const_cast<char*>(inputBuffer));
-      zstream.avail_in = static_cast<uInt>(availSize);
-      outputBuffer = buffer.data();
-      zstream.next_out =
-        reinterpret_cast<Bytef*>(const_cast<char*>(outputBuffer));
-      zstream.avail_out = static_cast<uInt>(blockSize);
-      if (inflateReset(&zstream) != Z_OK) {
-        throw std::logic_error("Bad inflateReset in "
-                               "ZlibDecompressionStream::Next");
-      }
-      int64_t result;
-      do {
-        result = inflate(&zstream, availSize == remainingLength ? Z_FINISH :
-                         Z_SYNC_FLUSH);
-        switch (result) {
-        case Z_OK:
-          remainingLength -= availSize;
-          inputBuffer += availSize;
-          readBuffer(true);
-          availSize =
-            std::min(static_cast<size_t>(inputBufferEnd - inputBuffer),
-                     remainingLength);
-          zstream.next_in =
-            reinterpret_cast<Bytef*>(const_cast<char*>(inputBuffer));
-          zstream.avail_in = static_cast<uInt>(availSize);
-          break;
-        case Z_STREAM_END:
-          break;
-        case Z_BUF_ERROR:
-          throw std::logic_error("Buffer error in "
-                                 "ZlibDecompressionStream::Next");
-        case Z_DATA_ERROR:
-          throw std::logic_error("Data error in "
-                                 "ZlibDecompressionStream::Next");
-        case Z_STREAM_ERROR:
-          throw std::logic_error("Stream error in "
-                                 "ZlibDecompressionStream::Next");
-        default:
-          throw std::logic_error("Unknown error in "
-                                 "ZlibDecompressionStream::Next");
-        }
-      } while (result != Z_STREAM_END);
-      *size = static_cast<int>(blockSize - zstream.avail_out);
-      *data = outputBuffer;
-      outputBufferLength = 0;
-      outputBuffer += *size;
-    } else {
-      throw std::logic_error("Unknown compression state in "
-                             "ZlibDecompressionStream::Next");
-    }
-    inputBuffer += availSize;
-    remainingLength -= availSize;
-    bytesReturned += *size;
-    return true;
-  }
-
-  void ZlibDecompressionStream::BackUp(int count) {
-    if (outputBuffer == nullptr || outputBufferLength != 0) {
-      throw std::logic_error("Backup without previous Next in "
-                             "ZlibDecompressionStream");
-    }
-    outputBuffer -= static_cast<size_t>(count);
-    outputBufferLength = static_cast<size_t>(count);
-    bytesReturned -= count;
-  }
-
-  bool ZlibDecompressionStream::Skip(int count) {
-    bytesReturned += count;
-    // this is a stupid implementation for now.
-    // should skip entire blocks without decompressing
-    while (count > 0) {
-      const void *ptr;
-      int len;
-      if (!Next(&ptr, &len)) {
-        return false;
-      }
-      if (len > count) {
-        BackUp(len - count);
-        count = 0;
-      } else {
-        count -= len;
-      }
-    }
-    return true;
-  }
-
-  int64_t ZlibDecompressionStream::ByteCount() const {
-    return bytesReturned;
-  }
-
-  void ZlibDecompressionStream::seek(PositionProvider& position) {
-    input->seek(position);
-    bytesReturned = input->ByteCount();
-    if (!Skip(static_cast<int>(position.next()))) {
-      throw ParseError("Bad skip in ZlibDecompressionStream::seek");
-    }
-  }
-
-  std::string ZlibDecompressionStream::getName() const {
-    std::ostringstream result;
-    result << "zlib(" << input->getName() << ")";
-    return result.str();
-  }
-
-  class SnappyDecompressionStream: public SeekableInputStream {
-  public:
-    SnappyDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
-                              size_t blockSize,
-                              MemoryPool& pool);
-
-    virtual ~SnappyDecompressionStream() {}
-    virtual bool Next(const void** data, int*size) override;
-    virtual void BackUp(int count) override;
-    virtual bool Skip(int count) override;
-    virtual int64_t ByteCount() const override;
-    virtual void seek(PositionProvider& position) override;
-    virtual std::string getName() const override;
-
-  private:
-    void readBuffer(bool failOnEof) {
-      int length;
-      if (!input->Next(reinterpret_cast<const void**>(&inputBufferPtr),
-                       &length)) {
-        if (failOnEof) {
-          throw ParseError("SnappyDecompressionStream read past EOF");
-        }
-        state = DECOMPRESS_EOF;
-        inputBufferPtr = nullptr;
-        inputBufferPtrEnd = nullptr;
-      } else {
-        inputBufferPtrEnd = inputBufferPtr + length;
-      }
-    }
-
-    uint32_t readByte(bool failOnEof) {
-      if (inputBufferPtr == inputBufferPtrEnd) {
-        readBuffer(failOnEof);
-        if (state == DECOMPRESS_EOF) {
-          return 0;
-        }
-      }
-      return static_cast<unsigned char>(*(inputBufferPtr++));
-    }
-
-    void readHeader() {
-      uint32_t header = readByte(false);
-      if (state != DECOMPRESS_EOF) {
-        header |= readByte(true) << 8;
-        header |= readByte(true) << 16;
-        if (header & 1) {
-          state = DECOMPRESS_ORIGINAL;
-        } else {
-          state = DECOMPRESS_START;
-        }
-        remainingLength = header >> 1;
-      } else {
-        remainingLength = 0;
-      }
-    }
-
-    std::unique_ptr<SeekableInputStream> input;
-    MemoryPool& pool;
-
-    // may need to stitch together multiple input buffers;
-    // to give snappy a contiguous block
-    DataBuffer<char> inputBuffer;
-
-    // uncompressed output
-    DataBuffer<char> outputBuffer;
-
-    // the current state
-    DecompressState state;
-
-    // the start of the current output buffer
-    const char* outputBufferPtr;
-    // the size of the current output buffer
-    size_t outputBufferLength;
-
-    // the size of the current chunk
-    size_t remainingLength;
-
-    // the last buffer returned from the input
-    const char *inputBufferPtr;
-    const char *inputBufferPtrEnd;
-
-    // bytes returned by this stream
-    off_t bytesReturned;
-  };
-
-  SnappyDecompressionStream::SnappyDecompressionStream
-                   (std::unique_ptr<SeekableInputStream> inStream,
-                    size_t bufferSize,
-                    MemoryPool& _pool
-                    ) : pool(_pool),
-                        inputBuffer(pool, bufferSize),
-                        outputBuffer(pool, bufferSize),
-                        state(DECOMPRESS_HEADER),
-                        outputBufferPtr(0),
-                        outputBufferLength(0),
-                        remainingLength(0),
-                        inputBufferPtr(0),
-                        inputBufferPtrEnd(0),
-                        bytesReturned(0) {
-    input.reset(inStream.release());
-  }
-
-  bool SnappyDecompressionStream::Next(const void** data, int*size) {
-    // if the user pushed back, return them the partial buffer
-    if (outputBufferLength) {
-      *data = outputBufferPtr;
-      *size = static_cast<int>(outputBufferLength);
-      outputBufferPtr += outputBufferLength;
-      bytesReturned += outputBufferLength;
-      outputBufferLength = 0;
-      return true;
-    }
-    if (state == DECOMPRESS_HEADER || remainingLength == 0) {
-      readHeader();
-    }
-    if (state == DECOMPRESS_EOF) {
-      return false;
-    }
-    if (inputBufferPtr == inputBufferPtrEnd) {
-      readBuffer(true);
-    }
-
-    size_t availSize =
-      std::min(static_cast<size_t>(inputBufferPtrEnd - inputBufferPtr),
-               remainingLength);
-    if (state == DECOMPRESS_ORIGINAL) {
-      *data = inputBufferPtr;
-      *size = static_cast<int>(availSize);
-      outputBufferPtr = inputBufferPtr + availSize;
-      outputBufferLength = 0;
-      inputBufferPtr += availSize;
-      remainingLength -= availSize;
-    } else if (state == DECOMPRESS_START) {
-      // Get contiguous bytes of compressed block.
-      const char *compressed = inputBufferPtr;
-      if (remainingLength == availSize) {
-          inputBufferPtr += availSize;
-      } else {
-        // Did not read enough from input.
-        if (inputBuffer.capacity() < remainingLength) {
-          inputBuffer.resize(remainingLength);
-        }
-        ::memcpy(inputBuffer.data(), inputBufferPtr, availSize);
-        inputBufferPtr += availSize;
-        compressed = inputBuffer.data();
-
-        for (size_t pos = availSize; pos < remainingLength; ) {
-          readBuffer(true);
-          size_t avail =
-              std::min(static_cast<size_t>(inputBufferPtrEnd - inputBufferPtr),
-                       remainingLength - pos);
-          ::memcpy(inputBuffer.data() + pos, inputBufferPtr, avail);
-          pos += avail;
-          inputBufferPtr += avail;
-        }
-      }
-
-      if (!snappy::GetUncompressedLength(compressed, remainingLength,
-                                         &outputBufferLength)) {
-        throw ParseError("SnappyDecompressionStream choked on corrupt input");
-      }
-
-      if (outputBufferLength > outputBuffer.capacity()) {
-        throw std::logic_error("uncompressed length exceeds block size");
-      }
-
-      if (!snappy::RawUncompress(compressed, remainingLength,
-                                 outputBuffer.data())) {
-        throw ParseError("SnappyDecompressionStream choked on corrupt input");
-      }
-
-      remainingLength = 0;
-      state = DECOMPRESS_HEADER;
-      *data = outputBuffer.data();
-      *size = static_cast<int>(outputBufferLength);
-      outputBufferPtr = outputBuffer.data() + outputBufferLength;
-      outputBufferLength = 0;
-    }
-
-    bytesReturned += *size;
-    return true;
-  }
-
-  void SnappyDecompressionStream::BackUp(int count) {
-    if (outputBufferPtr == nullptr || outputBufferLength != 0) {
-      throw std::logic_error("Backup without previous Next in "
-                             "SnappyDecompressionStream");
-    }
-    outputBufferPtr -= static_cast<size_t>(count);
-    outputBufferLength = static_cast<size_t>(count);
-    bytesReturned -= count;
-  }
-
-  bool SnappyDecompressionStream::Skip(int count) {
-    bytesReturned += count;
-    // this is a stupid implementation for now.
-    // should skip entire blocks without decompressing
-    while (count > 0) {
-      const void *ptr;
-      int len;
-      if (!Next(&ptr, &len)) {
-        return false;
-      }
-      if (len > count) {
-        BackUp(len - count);
-        count = 0;
-      } else {
-        count -= len;
-      }
-    }
-    return true;
-  }
-
-  int64_t SnappyDecompressionStream::ByteCount() const {
-    return bytesReturned;
-  }
-
-  void SnappyDecompressionStream::seek(PositionProvider& position) {
-    input->seek(position);
-    if (!Skip(static_cast<int>(position.next()))) {
-      throw ParseError("Bad skip in SnappyDecompressionStream::seek");
-    }
-  }
-
-  std::string SnappyDecompressionStream::getName() const {
-    std::ostringstream result;
-    result << "snappy(" << input->getName() << ")";
-    return result.str();
-  }
-
-  std::unique_ptr<SeekableInputStream>
-     createDecompressor(CompressionKind kind,
-                        std::unique_ptr<SeekableInputStream> input,
-                        uint64_t blockSize,
-                        MemoryPool& pool) {
-    switch (static_cast<int64_t>(kind)) {
-    case CompressionKind_NONE:
-      return std::move(input);
-    case CompressionKind_ZLIB:
-      return std::unique_ptr<SeekableInputStream>
-        (new ZlibDecompressionStream(std::move(input), blockSize, pool));
-    case CompressionKind_SNAPPY:
-      return std::unique_ptr<SeekableInputStream>
-        (new SnappyDecompressionStream(std::move(input), blockSize, pool));
-    case CompressionKind_LZO:
-    default:
-      throw NotImplementedYet("compression codec");
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/7f55b453/c++/src/orc/Compression.hh
----------------------------------------------------------------------
diff --git a/c++/src/orc/Compression.hh b/c++/src/orc/Compression.hh
deleted file mode 100644
index 222dc54..0000000
--- a/c++/src/orc/Compression.hh
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef ORC_COMPRESSION_HH
-#define ORC_COMPRESSION_HH
-
-#include "orc/Adaptor.hh"
-#include "orc/OrcFile.hh"
-#include "wrap/zero-copy-stream-wrapper.h"
-
-#include <list>
-#include <vector>
-#include <fstream>
-#include <iostream>
-#include <sstream>
-#include <memory>
-
-namespace orc {
-
-  void printBuffer(std::ostream& out,
-                   const char *buffer,
-                   uint64_t length);
-
-  class PositionProvider {
-  private:
-    std::list<uint64_t>::const_iterator position;
-  public:
-    PositionProvider(const std::list<uint64_t>& positions);
-    uint64_t next();
-  };
-
-  /**
-   * A subclass of Google's ZeroCopyInputStream that supports seek.
-   * By extending Google's class, we get the ability to pass it directly
-   * to the protobuf readers.
-   */
-  class SeekableInputStream: public google::protobuf::io::ZeroCopyInputStream {
-  public:
-    virtual ~SeekableInputStream();
-    virtual void seek(PositionProvider& position) = 0;
-    virtual std::string getName() const = 0;
-  };
-
-  /**
-   * Create a seekable input stream based on a memory range.
-   */
-  class SeekableArrayInputStream: public SeekableInputStream {
-  private:
-    const char* data;
-    uint64_t length;
-    uint64_t position;
-    uint64_t blockSize;
-
-  public:
-    SeekableArrayInputStream(const unsigned char* list,
-                             uint64_t length,
-                             int64_t block_size = -1);
-    SeekableArrayInputStream(const char* list,
-                             uint64_t length,
-                             int64_t block_size = -1);
-    virtual ~SeekableArrayInputStream();
-    virtual bool Next(const void** data, int*size) override;
-    virtual void BackUp(int count) override;
-    virtual bool Skip(int count) override;
-    virtual google::protobuf::int64 ByteCount() const override;
-    virtual void seek(PositionProvider& position) override;
-    virtual std::string getName() const override;
-  };
-
-  /**
-   * Create a seekable input stream based on an input stream.
-   */
-  class SeekableFileInputStream: public SeekableInputStream {
-  private:
-    MemoryPool& pool;
-    InputStream* const input;
-    const uint64_t start;
-    const uint64_t length;
-    const uint64_t blockSize;
-    std::unique_ptr<DataBuffer<char> > buffer;
-    uint64_t position;
-    uint64_t pushBack;
-
-  public:
-    SeekableFileInputStream(InputStream* input,
-                            uint64_t offset,
-                            uint64_t byteCount,
-                            MemoryPool& pool,
-                            int64_t blockSize = -1);
-    virtual ~SeekableFileInputStream();
-
-    virtual bool Next(const void** data, int*size) override;
-    virtual void BackUp(int count) override;
-    virtual bool Skip(int count) override;
-    virtual int64_t ByteCount() const override;
-    virtual void seek(PositionProvider& position) override;
-    virtual std::string getName() const override;
-  };
-
-  /**
-   * Create a decompressor for the given compression kind.
-   * @param kind the compression type to implement
-   * @param input the input stream that is the underlying source
-   * @param bufferSize the maximum size of the buffer
-   * @param pool the memory pool
-   */
-  std::unique_ptr<SeekableInputStream>
-     createDecompressor(CompressionKind kind,
-                        std::unique_ptr<SeekableInputStream> input,
-                        uint64_t bufferSize,
-                        MemoryPool& pool);
-}
-
-#endif