You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/05/10 20:57:34 UTC

orc git commit: ORC-176: Refactor common classes for writer and reader

Repository: orc
Updated Branches:
  refs/heads/master 90f138b06 -> 77ed66829


ORC-176: Refactor common classes for writer and reader

This is mainly a refactoring change for ORC-176, including:

1. Extracted common classes and functions into Common.hh and Common.cc;
2. Put InputStream interface and its implementations from Compression.hh to InputStream.hh and InputStream.cc.

Fixes #118

Signed-off-by: Owen O'Malley <om...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/77ed6682
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/77ed6682
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/77ed6682

Branch: refs/heads/master
Commit: 77ed6682924462643ac8189adee8dae8eed1ae54
Parents: 90f138b
Author: Gang Wu <ga...@alibaba-inc.com>
Authored: Wed May 10 13:43:05 2017 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Wed May 10 13:56:56 2017 -0700

----------------------------------------------------------------------
 c++/include/orc/Common.hh | 167 +++++++++++++++++++++++++++++++
 c++/include/orc/Reader.hh | 137 +------------------------
 c++/src/CMakeLists.txt    |   2 +
 c++/src/Common.cc         | 107 ++++++++++++++++++++
 c++/src/Compression.cc    | 195 ------------------------------------
 c++/src/Compression.hh    |  92 +----------------
 c++/src/Reader.cc         |  82 ---------------
 c++/src/io/InputStream.cc | 222 +++++++++++++++++++++++++++++++++++++++++
 c++/src/io/InputStream.hh | 116 +++++++++++++++++++++
 9 files changed, 616 insertions(+), 504 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/include/orc/Common.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/Common.hh b/c++/include/orc/Common.hh
new file mode 100644
index 0000000..f499b81
--- /dev/null
+++ b/c++/include/orc/Common.hh
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_COMMON_HH
+#define ORC_COMMON_HH
+
+#include "orc/Vector.hh"
+#include "orc/Type.hh"
+#include "Exceptions.hh"
+#include "wrap/orc-proto-wrapper.hh"
+
+#include <string>
+
+namespace orc {
+  enum CompressionKind {
+    CompressionKind_NONE = 0,
+    CompressionKind_ZLIB = 1,
+    CompressionKind_SNAPPY = 2,
+    CompressionKind_LZO = 3,
+    CompressionKind_LZ4 = 4,
+    CompressionKind_ZSTD = 5,
+    CompressionKind_MAX = INT64_MAX
+  };
+
+  /**
+   * Get the name of the CompressionKind.
+   */
+  std::string compressionKindToString(CompressionKind kind);
+
+  enum WriterVersion {
+    WriterVersion_ORIGINAL = 0,
+    WriterVersion_HIVE_8732 = 1,
+    WriterVersion_HIVE_4243 = 2,
+    WriterVersion_HIVE_12055 = 3,
+    WriterVersion_HIVE_13083 = 4,
+    WriterVersion_ORC_101 = 5,
+    WriterVersion_ORC_135 = 6,
+    WriterVersion_MAX = INT64_MAX
+  };
+
+  /**
+   * Get the name of the WriterVersion.
+   */
+  std::string writerVersionToString(WriterVersion kind);
+
+  enum StreamKind {
+    StreamKind_PRESENT = 0,
+    StreamKind_DATA = 1,
+    StreamKind_LENGTH = 2,
+    StreamKind_DICTIONARY_DATA = 3,
+    StreamKind_DICTIONARY_COUNT = 4,
+    StreamKind_SECONDARY = 5,
+    StreamKind_ROW_INDEX = 6,
+    StreamKind_BLOOM_FILTER = 7
+  };
+
+  /**
+   * Get the string representation of the StreamKind.
+   */
+  std::string streamKindToString(StreamKind kind);
+
+  class StreamInformation {
+  public:
+    virtual ~StreamInformation();
+
+    virtual StreamKind getKind() const = 0;
+    virtual uint64_t getColumnId() const = 0;
+    virtual uint64_t getOffset() const = 0;
+    virtual uint64_t getLength() const = 0;
+  };
+
+  enum ColumnEncodingKind {
+    ColumnEncodingKind_DIRECT = 0,
+    ColumnEncodingKind_DICTIONARY = 1,
+    ColumnEncodingKind_DIRECT_V2 = 2,
+    ColumnEncodingKind_DICTIONARY_V2 = 3
+  };
+
+  std::string columnEncodingKindToString(ColumnEncodingKind kind);
+
+  class StripeInformation {
+  public:
+    virtual ~StripeInformation();
+
+    /**
+     * Get the byte offset of the start of the stripe.
+     * @return the bytes from the start of the file
+     */
+    virtual uint64_t getOffset() const = 0;
+
+    /**
+     * Get the total length of the stripe in bytes.
+     * @return the number of bytes in the stripe
+     */
+    virtual uint64_t getLength() const = 0;
+
+    /**
+     * Get the length of the stripe's indexes.
+     * @return the number of bytes in the index
+     */
+    virtual uint64_t getIndexLength() const = 0;
+
+    /**
+     * Get the length of the stripe's data.
+     * @return the number of bytes in the stripe
+     */
+    virtual uint64_t getDataLength()const = 0;
+
+    /**
+     * Get the length of the stripe's tail section, which contains its index.
+     * @return the number of bytes in the tail
+     */
+    virtual uint64_t getFooterLength() const = 0;
+
+    /**
+     * Get the number of rows in the stripe.
+     * @return a count of the number of rows
+     */
+    virtual uint64_t getNumberOfRows() const = 0;
+
+    /**
+     * Get the number of streams in the stripe.
+     */
+    virtual uint64_t getNumberOfStreams() const = 0;
+
+    /**
+     * Get the StreamInformation for the given stream.
+     */
+    virtual ORC_UNIQUE_PTR<StreamInformation>
+    getStreamInformation(uint64_t streamId) const = 0;
+
+    /**
+     * Get the column encoding for the given column.
+     * @param colId the columnId
+     */
+    virtual ColumnEncodingKind getColumnEncoding(uint64_t colId) const = 0;
+
+    /**
+     * Get the dictionary size.
+     * @param colId the columnId
+     * @return the size of the dictionary or 0 if there isn't one
+     */
+    virtual uint64_t getDictionarySize(uint64_t colId) const = 0;
+
+    /**
+     * Get the writer timezone.
+     */
+    virtual const std::string& getWriterTimezone() const = 0;
+  };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/include/orc/Reader.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 76d7853..3912bd7 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -19,6 +19,7 @@
 #ifndef ORC_READER_HH
 #define ORC_READER_HH
 
+#include "orc/Common.hh"
 #include "orc/orc-config.hh"
 #include "orc/Statistics.hh"
 #include "orc/Type.hh"
@@ -34,142 +35,6 @@ namespace orc {
   struct ReaderOptionsPrivate;
   struct RowReaderOptionsPrivate;
 
-  enum CompressionKind {
-    CompressionKind_NONE = 0,
-    CompressionKind_ZLIB = 1,
-    CompressionKind_SNAPPY = 2,
-    CompressionKind_LZO = 3,
-    CompressionKind_LZ4 = 4,
-    CompressionKind_ZSTD = 5,
-    CompressionKind_MAX = INT64_MAX
-  };
-
-  /**
-   * Get the name of the CompressionKind.
-   */
-  std::string compressionKindToString(CompressionKind kind);
-
-  enum WriterVersion {
-    WriterVersion_ORIGINAL = 0,
-    WriterVersion_HIVE_8732 = 1,
-    WriterVersion_HIVE_4243 = 2,
-    WriterVersion_HIVE_12055 = 3,
-    WriterVersion_HIVE_13083 = 4,
-    WriterVersion_ORC_101 = 5,
-    WriterVersion_ORC_135 = 6,
-    WriterVersion_MAX = INT64_MAX
-  };
-
-  /**
-   * Get the name of the WriterVersion.
-   */
-  std::string writerVersionToString(WriterVersion kind);
-
-  enum StreamKind {
-    StreamKind_PRESENT = 0,
-    StreamKind_DATA = 1,
-    StreamKind_LENGTH = 2,
-    StreamKind_DICTIONARY_DATA = 3,
-    StreamKind_DICTIONARY_COUNT = 4,
-    StreamKind_SECONDARY = 5,
-    StreamKind_ROW_INDEX = 6,
-    StreamKind_BLOOM_FILTER = 7
-  };
-
-  /**
-   * Get the string representation of the StreamKind.
-   */
-  std::string streamKindToString(StreamKind kind);
-
-  class StreamInformation {
-  public:
-    virtual ~StreamInformation();
-
-    virtual StreamKind getKind() const = 0;
-    virtual uint64_t getColumnId() const = 0;
-    virtual uint64_t getOffset() const = 0;
-    virtual uint64_t getLength() const = 0;
-  };
-
-  enum ColumnEncodingKind {
-    ColumnEncodingKind_DIRECT = 0,
-    ColumnEncodingKind_DICTIONARY = 1,
-    ColumnEncodingKind_DIRECT_V2 = 2,
-    ColumnEncodingKind_DICTIONARY_V2 = 3
-  };
-
-  std::string columnEncodingKindToString(ColumnEncodingKind kind);
-
-  class StripeInformation {
-  public:
-    virtual ~StripeInformation();
-
-    /**
-     * Get the byte offset of the start of the stripe.
-     * @return the bytes from the start of the file
-     */
-    virtual uint64_t getOffset() const = 0;
-
-    /**
-     * Get the total length of the stripe in bytes.
-     * @return the number of bytes in the stripe
-     */
-    virtual uint64_t getLength() const = 0;
-
-    /**
-     * Get the length of the stripe's indexes.
-     * @return the number of bytes in the index
-     */
-    virtual uint64_t getIndexLength() const = 0;
-
-    /**
-     * Get the length of the stripe's data.
-     * @return the number of bytes in the stripe
-     */
-    virtual uint64_t getDataLength()const = 0;
-
-    /**
-     * Get the length of the stripe's tail section, which contains its index.
-     * @return the number of bytes in the tail
-     */
-    virtual uint64_t getFooterLength() const = 0;
-
-    /**
-     * Get the number of rows in the stripe.
-     * @return a count of the number of rows
-     */
-    virtual uint64_t getNumberOfRows() const = 0;
-
-    /**
-     * Get the number of streams in the stripe.
-     */
-    virtual uint64_t getNumberOfStreams() const = 0;
-
-    /**
-     * Get the StreamInformation for the given stream.
-     */
-    virtual ORC_UNIQUE_PTR<StreamInformation>
-      getStreamInformation(uint64_t streamId) const = 0;
-
-    /**
-     * Get the column encoding for the given column.
-     * @param colId the columnId
-     */
-    virtual ColumnEncodingKind getColumnEncoding(uint64_t colId) const = 0;
-
-    /**
-     * Get the dictionary size.
-     * @param colId the columnId
-     * @return the size of the dictionary or 0 if there isn't one
-     */
-    virtual uint64_t getDictionarySize(uint64_t colId) const = 0;
-
-    /**
-     * Get the writer timezone.
-     */
-    virtual const std::string& getWriterTimezone() const = 0;
-  };
-
   /**
    * Options for creating a Reader.
    */

http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index d717fb4..c39437a 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -135,10 +135,12 @@ add_custom_command(OUTPUT orc_proto.pb.h orc_proto.pb.cc
 add_library (orc STATIC
   "${CMAKE_CURRENT_BINARY_DIR}/Adaptor.hh"
   orc_proto.pb.h
+  io/InputStream.cc
   wrap/orc-proto-wrapper.cc
   ByteRLE.cc
   ColumnPrinter.cc
   ColumnReader.cc
+  Common.cc
   Compression.cc
   Exceptions.cc
   Int128.cc

http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/Common.cc
----------------------------------------------------------------------
diff --git a/c++/src/Common.cc b/c++/src/Common.cc
new file mode 100644
index 0000000..7813612
--- /dev/null
+++ b/c++/src/Common.cc
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "orc/Common.hh"
+
+#include <sstream>
+
+namespace orc {
+
+  std::string compressionKindToString(CompressionKind kind) {
+    switch (static_cast<int>(kind)) {
+      case CompressionKind_NONE:
+        return "none";
+      case CompressionKind_ZLIB:
+        return "zlib";
+      case CompressionKind_SNAPPY:
+        return "snappy";
+      case CompressionKind_LZO:
+        return "lzo";
+      case CompressionKind_LZ4:
+        return "lz4";
+      case CompressionKind_ZSTD:
+        return "zstd";
+    }
+    std::stringstream buffer;
+    buffer << "unknown - " << kind;
+    return buffer.str();
+  }
+
+  std::string writerVersionToString(WriterVersion version) {
+    switch (static_cast<int>(version)) {
+      case WriterVersion_ORIGINAL:
+        return "original";
+      case WriterVersion_HIVE_8732:
+        return "HIVE-8732";
+      case WriterVersion_HIVE_4243:
+        return "HIVE-4243";
+      case WriterVersion_HIVE_12055:
+        return "HIVE-12055";
+      case WriterVersion_HIVE_13083:
+        return "HIVE-13083";
+      case WriterVersion_ORC_101:
+        return "ORC-101";
+      case WriterVersion_ORC_135:
+        return "ORC-135";
+    }
+    std::stringstream buffer;
+    buffer << "future - " << version;
+    return buffer.str();
+  }
+
+  std::string streamKindToString(StreamKind kind) {
+    switch (static_cast<int>(kind)) {
+      case StreamKind_PRESENT:
+        return "present";
+      case StreamKind_DATA:
+        return "data";
+      case StreamKind_LENGTH:
+        return "length";
+      case StreamKind_DICTIONARY_DATA:
+        return "dictionary";
+      case StreamKind_DICTIONARY_COUNT:
+        return "dictionary count";
+      case StreamKind_SECONDARY:
+        return "secondary";
+      case StreamKind_ROW_INDEX:
+        return "index";
+      case StreamKind_BLOOM_FILTER:
+        return "bloom";
+    }
+    std::stringstream buffer;
+    buffer << "unknown - " << kind;
+    return buffer.str();
+  }
+
+  std::string columnEncodingKindToString(ColumnEncodingKind kind) {
+    switch (static_cast<int>(kind)) {
+      case ColumnEncodingKind_DIRECT:
+        return "direct";
+      case ColumnEncodingKind_DICTIONARY:
+        return "dictionary";
+      case ColumnEncodingKind_DIRECT_V2:
+        return "direct rle2";
+      case ColumnEncodingKind_DICTIONARY_V2:
+        return "dictionary rle2";
+    }
+    std::stringstream buffer;
+    buffer << "unknown - " << kind;
+    return buffer.str();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/Compression.cc
----------------------------------------------------------------------
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index 81cc578..e2f9dbf 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -33,201 +33,6 @@
 
 namespace orc {
 
-  void printBuffer(std::ostream& out,
-                   const char *buffer,
-                   uint64_t length) {
-    const uint64_t width = 24;
-    out << std::hex;
-    for(uint64_t line = 0; line < (length + width - 1) / width; ++line) {
-      out << std::setfill('0') << std::setw(7) << (line * width);
-      for(uint64_t byte = 0;
-          byte < width && line * width + byte < length; ++byte) {
-        out << " " << std::setfill('0') << std::setw(2)
-                  << static_cast<uint64_t>(0xff & buffer[line * width +
-                                                             byte]);
-      }
-      out << "\n";
-    }
-    out << std::dec;
-  }
-
-  PositionProvider::PositionProvider(const std::list<uint64_t>& posns) {
-    position = posns.begin();
-  }
-
-  uint64_t PositionProvider::next() {
-    uint64_t result = *position;
-    ++position;
-    return result;
-  }
-
-  SeekableInputStream::~SeekableInputStream() {
-    // PASS
-  }
-
-  SeekableArrayInputStream::~SeekableArrayInputStream() {
-    // PASS
-  }
-
-  SeekableArrayInputStream::SeekableArrayInputStream
-               (const unsigned char* values,
-                uint64_t size,
-                uint64_t blkSize
-                ): data(reinterpret_cast<const char*>(values)) {
-    length = size;
-    position = 0;
-    blockSize = blkSize == 0 ? length : static_cast<uint64_t>(blkSize);
-  }
-
-  SeekableArrayInputStream::SeekableArrayInputStream(const char* values,
-                                                     uint64_t size,
-                                                     uint64_t blkSize
-                                                     ): data(values) {
-    length = size;
-    position = 0;
-    blockSize = blkSize == 0 ? length : static_cast<uint64_t>(blkSize);
-  }
-
-  bool SeekableArrayInputStream::Next(const void** buffer, int*size) {
-    uint64_t currentSize = std::min(length - position, blockSize);
-    if (currentSize > 0) {
-      *buffer = data + position;
-      *size = static_cast<int>(currentSize);
-      position += currentSize;
-      return true;
-    }
-    *size = 0;
-    return false;
-  }
-
-  void SeekableArrayInputStream::BackUp(int count) {
-    if (count >= 0) {
-      uint64_t unsignedCount = static_cast<uint64_t>(count);
-      if (unsignedCount <= blockSize && unsignedCount <= position) {
-        position -= unsignedCount;
-      } else {
-        throw std::logic_error("Can't backup that much!");
-      }
-    }
-  }
-
-  bool SeekableArrayInputStream::Skip(int count) {
-    if (count >= 0) {
-      uint64_t unsignedCount = static_cast<uint64_t>(count);
-      if (unsignedCount + position <= length) {
-        position += unsignedCount;
-        return true;
-      } else {
-        position = length;
-      }
-    }
-    return false;
-  }
-
-  google::protobuf::int64 SeekableArrayInputStream::ByteCount() const {
-    return static_cast<google::protobuf::int64>(position);
-  }
-
-  void SeekableArrayInputStream::seek(PositionProvider& seekPosition) {
-    position = seekPosition.next();
-  }
-
-  std::string SeekableArrayInputStream::getName() const {
-    std::ostringstream result;
-    result << "SeekableArrayInputStream " << position << " of " << length;
-    return result.str();
-  }
-
-  static uint64_t computeBlock(uint64_t request, uint64_t length) {
-    return std::min(length, request == 0 ? 256 * 1024 : request);
-  }
-
-  SeekableFileInputStream::SeekableFileInputStream(InputStream* stream,
-                                                   uint64_t offset,
-                                                   uint64_t byteCount,
-                                                   MemoryPool& _pool,
-                                                   uint64_t _blockSize
-                                                   ):pool(_pool),
-                                                     input(stream),
-                                                     start(offset),
-                                                     length(byteCount),
-                                                     blockSize(computeBlock
-                                                               (_blockSize,
-                                                                length)) {
-
-    position = 0;
-    buffer.reset(new DataBuffer<char>(pool));
-    pushBack = 0;
-  }
-
-  SeekableFileInputStream::~SeekableFileInputStream() {
-    // PASS
-  }
-
-  bool SeekableFileInputStream::Next(const void** data, int*size) {
-    uint64_t bytesRead;
-    if (pushBack != 0) {
-      *data = buffer->data() + (buffer->size() - pushBack);
-      bytesRead = pushBack;
-    } else {
-      bytesRead = std::min(length - position, blockSize);
-      buffer->resize(bytesRead);
-      if (bytesRead > 0) {
-        input->read(buffer->data(), bytesRead, start+position);
-        *data = static_cast<void*>(buffer->data());
-      }
-    }
-    position += bytesRead;
-    pushBack = 0;
-    *size = static_cast<int>(bytesRead);
-    return bytesRead != 0;
-  }
-
-  void SeekableFileInputStream::BackUp(int signedCount) {
-    if (signedCount < 0) {
-      throw std::logic_error("can't backup negative distances");
-    }
-    uint64_t count = static_cast<uint64_t>(signedCount);
-    if (pushBack > 0) {
-      throw std::logic_error("can't backup unless we just called Next");
-    }
-    if (count > blockSize || count > position) {
-      throw std::logic_error("can't backup that far");
-    }
-    pushBack = static_cast<uint64_t>(count);
-    position -= pushBack;
-  }
-
-  bool SeekableFileInputStream::Skip(int signedCount) {
-    if (signedCount < 0) {
-      return false;
-    }
-    uint64_t count = static_cast<uint64_t>(signedCount);
-    position = std::min(position + count, length);
-    pushBack = 0;
-    return position < length;
-  }
-
-  int64_t SeekableFileInputStream::ByteCount() const {
-    return static_cast<int64_t>(position);
-  }
-
-  void SeekableFileInputStream::seek(PositionProvider& location) {
-    position = location.next();
-    if (position > length) {
-      position = length;
-      throw std::logic_error("seek too far");
-    }
-    pushBack = 0;
-  }
-
-  std::string SeekableFileInputStream::getName() const {
-    std::ostringstream result;
-    result << input->getName() << " from " << start << " for "
-           << length;
-    return result.str();
-  }
-
   enum DecompressState { DECOMPRESS_HEADER,
                          DECOMPRESS_START,
                          DECOMPRESS_CONTINUE,

http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/Compression.hh
----------------------------------------------------------------------
diff --git a/c++/src/Compression.hh b/c++/src/Compression.hh
index efd374a..8c3eda7 100644
--- a/c++/src/Compression.hh
+++ b/c++/src/Compression.hh
@@ -19,100 +19,10 @@
 #ifndef ORC_COMPRESSION_HH
 #define ORC_COMPRESSION_HH
 
-#include "orc/OrcFile.hh"
-
-#include "Adaptor.hh"
-#include "wrap/zero-copy-stream-wrapper.h"
-
-#include <list>
-#include <vector>
-#include <fstream>
-#include <iostream>
-#include <sstream>
-#include <memory>
+#include "io/InputStream.hh"
 
 namespace orc {
 
-  void printBuffer(std::ostream& out,
-                   const char *buffer,
-                   uint64_t length);
-
-  class PositionProvider {
-  private:
-    std::list<uint64_t>::const_iterator position;
-  public:
-    PositionProvider(const std::list<uint64_t>& positions);
-    uint64_t next();
-  };
-
-  /**
-   * A subclass of Google's ZeroCopyInputStream that supports seek.
-   * By extending Google's class, we get the ability to pass it directly
-   * to the protobuf readers.
-   */
-  class SeekableInputStream: public google::protobuf::io::ZeroCopyInputStream {
-  public:
-    virtual ~SeekableInputStream();
-    virtual void seek(PositionProvider& position) = 0;
-    virtual std::string getName() const = 0;
-  };
-
-  /**
-   * Create a seekable input stream based on a memory range.
-   */
-  class SeekableArrayInputStream: public SeekableInputStream {
-  private:
-    const char* data;
-    uint64_t length;
-    uint64_t position;
-    uint64_t blockSize;
-
-  public:
-    SeekableArrayInputStream(const unsigned char* list,
-                             uint64_t length,
-                             uint64_t block_size = 0);
-    SeekableArrayInputStream(const char* list,
-                             uint64_t length,
-                             uint64_t block_size = 0);
-    virtual ~SeekableArrayInputStream();
-    virtual bool Next(const void** data, int*size) override;
-    virtual void BackUp(int count) override;
-    virtual bool Skip(int count) override;
-    virtual google::protobuf::int64 ByteCount() const override;
-    virtual void seek(PositionProvider& position) override;
-    virtual std::string getName() const override;
-  };
-
-  /**
-   * Create a seekable input stream based on an input stream.
-   */
-  class SeekableFileInputStream: public SeekableInputStream {
-  private:
-    MemoryPool& pool;
-    InputStream* const input;
-    const uint64_t start;
-    const uint64_t length;
-    const uint64_t blockSize;
-    std::unique_ptr<DataBuffer<char> > buffer;
-    uint64_t position;
-    uint64_t pushBack;
-
-  public:
-    SeekableFileInputStream(InputStream* input,
-                            uint64_t offset,
-                            uint64_t byteCount,
-                            MemoryPool& pool,
-                            uint64_t blockSize = 0);
-    virtual ~SeekableFileInputStream();
-
-    virtual bool Next(const void** data, int*size) override;
-    virtual void BackUp(int count) override;
-    virtual bool Skip(int count) override;
-    virtual int64_t ByteCount() const override;
-    virtual void seek(PositionProvider& position) override;
-    virtual std::string getName() const override;
-  };
-
   /**
    * Create a decompressor for the given compression kind.
    * @param kind the compression type to implement

http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/Reader.cc
----------------------------------------------------------------------
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 9c423bd..fe8608a 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -35,48 +35,6 @@
 
 namespace orc {
 
-  std::string compressionKindToString(CompressionKind kind) {
-    switch (static_cast<int>(kind)) {
-    case CompressionKind_NONE:
-      return "none";
-    case CompressionKind_ZLIB:
-      return "zlib";
-    case CompressionKind_SNAPPY:
-      return "snappy";
-    case CompressionKind_LZO:
-      return "lzo";
-    case CompressionKind_LZ4:
-      return "lz4";
-    case CompressionKind_ZSTD:
-      return "zstd";
-    }
-    std::stringstream buffer;
-    buffer << "unknown - " << kind;
-    return buffer.str();
-  }
-
-  std::string writerVersionToString(WriterVersion version) {
-    switch (static_cast<int>(version)) {
-    case WriterVersion_ORIGINAL:
-      return "original";
-    case WriterVersion_HIVE_8732:
-      return "HIVE-8732";
-    case WriterVersion_HIVE_4243:
-      return "HIVE-4243";
-    case WriterVersion_HIVE_12055:
-      return "HIVE-12055";
-    case WriterVersion_HIVE_13083:
-      return "HIVE-13083";
-    case WriterVersion_ORC_101:
-      return "ORC-101";
-    case WriterVersion_ORC_135:
-      return "ORC-135";
-    }
-    std::stringstream buffer;
-    buffer << "future - " << version;
-    return buffer.str();
-  }
-
   uint64_t getCompressionBlockSize(const proto::PostScript& ps) {
     if (ps.has_compressionblocksize()) {
       return ps.compressionblocksize();
@@ -961,46 +919,6 @@ namespace orc {
                                                   postscriptLength));
   }
 
-  std::string streamKindToString(StreamKind kind) {
-    switch (static_cast<int>(kind)) {
-    case StreamKind_PRESENT:
-      return "present";
-    case StreamKind_DATA:
-      return "data";
-    case StreamKind_LENGTH:
-      return "length";
-    case StreamKind_DICTIONARY_DATA:
-      return "dictionary";
-    case StreamKind_DICTIONARY_COUNT:
-      return "dictionary count";
-    case StreamKind_SECONDARY:
-      return "secondary";
-    case StreamKind_ROW_INDEX:
-      return "index";
-    case StreamKind_BLOOM_FILTER:
-      return "bloom";
-    }
-    std::stringstream buffer;
-    buffer << "unknown - " << kind;
-    return buffer.str();
-  }
-
-  std::string columnEncodingKindToString(ColumnEncodingKind kind) {
-    switch (static_cast<int>(kind)) {
-    case ColumnEncodingKind_DIRECT:
-      return "direct";
-    case ColumnEncodingKind_DICTIONARY:
-      return "dictionary";
-    case ColumnEncodingKind_DIRECT_V2:
-      return "direct rle2";
-    case ColumnEncodingKind_DICTIONARY_V2:
-      return "dictionary rle2";
-    }
-    std::stringstream buffer;
-    buffer << "unknown - " << kind;
-    return buffer.str();
-  }
-
   RowReader::~RowReader() {
     // PASS
   }

http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/io/InputStream.cc
----------------------------------------------------------------------
diff --git a/c++/src/io/InputStream.cc b/c++/src/io/InputStream.cc
new file mode 100644
index 0000000..fd91b23
--- /dev/null
+++ b/c++/src/io/InputStream.cc
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exceptions.hh"
+#include "InputStream.hh"
+
+#include <algorithm>
+#include <iomanip>
+
+namespace orc {
+
+  void printBuffer(std::ostream& out,
+                   const char *buffer,
+                   uint64_t length) {
+    const uint64_t width = 24;
+    out << std::hex;
+    for(uint64_t line = 0; line < (length + width - 1) / width; ++line) {
+      out << std::setfill('0') << std::setw(7) << (line * width);
+      for(uint64_t byte = 0;
+          byte < width && line * width + byte < length; ++byte) {
+        out << " " << std::setfill('0') << std::setw(2)
+            << static_cast<uint64_t>(0xff & buffer[line * width +
+                                                   byte]);
+      }
+      out << "\n";
+    }
+    out << std::dec;
+  }
+
+  PositionProvider::PositionProvider(const std::list<uint64_t>& posns) {
+    position = posns.begin();
+  }
+
+  uint64_t PositionProvider::next() {
+    uint64_t result = *position;
+    ++position;
+    return result;
+  }
+
+  SeekableInputStream::~SeekableInputStream() {
+    // PASS
+  }
+
+  SeekableArrayInputStream::~SeekableArrayInputStream() {
+    // PASS
+  }
+
+  SeekableArrayInputStream::SeekableArrayInputStream
+               (const unsigned char* values,
+                uint64_t size,
+                uint64_t blkSize
+               ): data(reinterpret_cast<const char*>(values)) {
+    length = size;
+    position = 0;
+    blockSize = blkSize == 0 ? length : static_cast<uint64_t>(blkSize);
+  }
+
+  SeekableArrayInputStream::SeekableArrayInputStream(const char* values,
+                                                     uint64_t size,
+                                                     uint64_t blkSize
+  ): data(values) {
+    length = size;
+    position = 0;
+    blockSize = blkSize == 0 ? length : static_cast<uint64_t>(blkSize);
+  }
+
+  bool SeekableArrayInputStream::Next(const void** buffer, int*size) {
+    uint64_t currentSize = std::min(length - position, blockSize);
+    if (currentSize > 0) {
+      *buffer = data + position;
+      *size = static_cast<int>(currentSize);
+      position += currentSize;
+      return true;
+    }
+    *size = 0;
+    return false;
+  }
+
+  void SeekableArrayInputStream::BackUp(int count) {
+    if (count >= 0) {
+      uint64_t unsignedCount = static_cast<uint64_t>(count);
+      if (unsignedCount <= blockSize && unsignedCount <= position) {
+        position -= unsignedCount;
+      } else {
+        throw std::logic_error("Can't backup that much!");
+      }
+    }
+  }
+
+  bool SeekableArrayInputStream::Skip(int count) {
+    if (count >= 0) {
+      uint64_t unsignedCount = static_cast<uint64_t>(count);
+      if (unsignedCount + position <= length) {
+        position += unsignedCount;
+        return true;
+      } else {
+        position = length;
+      }
+    }
+    return false;
+  }
+
+  google::protobuf::int64 SeekableArrayInputStream::ByteCount() const {
+    return static_cast<google::protobuf::int64>(position);
+  }
+
+  void SeekableArrayInputStream::seek(PositionProvider& seekPosition) {
+    position = seekPosition.next();
+  }
+
+  std::string SeekableArrayInputStream::getName() const {
+    std::ostringstream result;
+    result << "SeekableArrayInputStream " << position << " of " << length;
+    return result.str();
+  }
+
+  static uint64_t computeBlock(uint64_t request, uint64_t length) {
+    return std::min(length, request == 0 ? 256 * 1024 : request);
+  }
+
+  SeekableFileInputStream::SeekableFileInputStream(InputStream* stream,
+                                                   uint64_t offset,
+                                                   uint64_t byteCount,
+                                                   MemoryPool& _pool,
+                                                   uint64_t _blockSize
+                                                   ):pool(_pool),
+                                                     input(stream),
+                                                     start(offset),
+                                                     length(byteCount),
+                                                     blockSize(computeBlock
+                                                               (_blockSize,
+                                                                length)) {
+
+    position = 0;
+    buffer.reset(new DataBuffer<char>(pool));
+    pushBack = 0;
+  }
+
+  SeekableFileInputStream::~SeekableFileInputStream() {
+    // PASS
+  }
+
+  bool SeekableFileInputStream::Next(const void** data, int*size) {
+    uint64_t bytesRead;
+    if (pushBack != 0) {
+      *data = buffer->data() + (buffer->size() - pushBack);
+      bytesRead = pushBack;
+    } else {
+      bytesRead = std::min(length - position, blockSize);
+      buffer->resize(bytesRead);
+      if (bytesRead > 0) {
+        input->read(buffer->data(), bytesRead, start+position);
+        *data = static_cast<void*>(buffer->data());
+      }
+    }
+    position += bytesRead;
+    pushBack = 0;
+    *size = static_cast<int>(bytesRead);
+    return bytesRead != 0;
+  }
+
+  void SeekableFileInputStream::BackUp(int signedCount) {
+    if (signedCount < 0) {
+      throw std::logic_error("can't backup negative distances");
+    }
+    uint64_t count = static_cast<uint64_t>(signedCount);
+    if (pushBack > 0) {
+      throw std::logic_error("can't backup unless we just called Next");
+    }
+    if (count > blockSize || count > position) {
+      throw std::logic_error("can't backup that far");
+    }
+    pushBack = static_cast<uint64_t>(count);
+    position -= pushBack;
+  }
+
+  bool SeekableFileInputStream::Skip(int signedCount) {
+    if (signedCount < 0) {
+      return false;
+    }
+    uint64_t count = static_cast<uint64_t>(signedCount);
+    position = std::min(position + count, length);
+    pushBack = 0;
+    return position < length;
+  }
+
+  int64_t SeekableFileInputStream::ByteCount() const {
+    return static_cast<int64_t>(position);
+  }
+
+  void SeekableFileInputStream::seek(PositionProvider& location) {
+    position = location.next();
+    if (position > length) {
+      position = length;
+      throw std::logic_error("seek too far");
+    }
+    pushBack = 0;
+  }
+
+  std::string SeekableFileInputStream::getName() const {
+    std::ostringstream result;
+    result << input->getName() << " from " << start << " for "
+           << length;
+    return result.str();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/77ed6682/c++/src/io/InputStream.hh
----------------------------------------------------------------------
diff --git a/c++/src/io/InputStream.hh b/c++/src/io/InputStream.hh
new file mode 100644
index 0000000..5ea8c7e
--- /dev/null
+++ b/c++/src/io/InputStream.hh
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_INPUTSTREAM_HH
+#define ORC_INPUTSTREAM_HH
+
+#include "Adaptor.hh"
+#include "orc/OrcFile.hh"
+#include "wrap/zero-copy-stream-wrapper.h"
+
+#include <list>
+#include <fstream>
+#include <iostream>
+#include <sstream>
+#include <vector>
+
+namespace orc {
+
+  void printBuffer(std::ostream& out,
+                   const char *buffer,
+                   uint64_t length);
+
+  class PositionProvider {
+  private:
+    std::list<uint64_t>::const_iterator position;
+  public:
+    PositionProvider(const std::list<uint64_t>& positions);
+    uint64_t next();
+  };
+
+  /**
+   * A subclass of Google's ZeroCopyInputStream that supports seek.
+   * By extending Google's class, we get the ability to pass it directly
+   * to the protobuf readers.
+   */
+  class SeekableInputStream: public google::protobuf::io::ZeroCopyInputStream {
+  public:
+    virtual ~SeekableInputStream();
+    virtual void seek(PositionProvider& position) = 0;
+    virtual std::string getName() const = 0;
+  };
+
+  /**
+   * Create a seekable input stream based on a memory range.
+   */
+  class SeekableArrayInputStream: public SeekableInputStream {
+  private:
+    const char* data;
+    uint64_t length;
+    uint64_t position;
+    uint64_t blockSize;
+
+  public:
+    SeekableArrayInputStream(const unsigned char* list,
+                             uint64_t length,
+                             uint64_t block_size = 0);
+    SeekableArrayInputStream(const char* list,
+                             uint64_t length,
+                             uint64_t block_size = 0);
+    virtual ~SeekableArrayInputStream();
+    virtual bool Next(const void** data, int*size) override;
+    virtual void BackUp(int count) override;
+    virtual bool Skip(int count) override;
+    virtual google::protobuf::int64 ByteCount() const override;
+    virtual void seek(PositionProvider& position) override;
+    virtual std::string getName() const override;
+  };
+
+  /**
+   * Create a seekable input stream based on an input stream.
+   */
+  class SeekableFileInputStream: public SeekableInputStream {
+  private:
+    MemoryPool& pool;
+    InputStream* const input;
+    const uint64_t start;
+    const uint64_t length;
+    const uint64_t blockSize;
+    std::unique_ptr<DataBuffer<char> > buffer;
+    uint64_t position;
+    uint64_t pushBack;
+
+  public:
+    SeekableFileInputStream(InputStream* input,
+                            uint64_t offset,
+                            uint64_t byteCount,
+                            MemoryPool& pool,
+                            uint64_t blockSize = 0);
+    virtual ~SeekableFileInputStream();
+
+    virtual bool Next(const void** data, int*size) override;
+    virtual void BackUp(int count) override;
+    virtual bool Skip(int count) override;
+    virtual int64_t ByteCount() const override;
+    virtual void seek(PositionProvider& position) override;
+    virtual std::string getName() const override;
+  };
+
+}
+
+#endif //ORC_INPUTSTREAM_HH