You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by pr...@apache.org on 2017/05/23 20:00:30 UTC

orc git commit: ORC-191 Implement RLE v1 encoder

Repository: orc
Updated Branches:
  refs/heads/master 9c5261005 -> 44c291081


ORC-191 Implement RLE v1 encoder

Fixes #126

Signed-off-by: Prasanth Jayachandran <pr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/44c29108
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/44c29108
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/44c29108

Branch: refs/heads/master
Commit: 44c291081c70a50dcdbcb5e2ba98f71234e01a6e
Parents: 9c52610
Author: Xiening.Dai <xi...@alibaba-inc.com>
Authored: Thu May 18 13:39:06 2017 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue May 23 12:55:35 2017 -0700

----------------------------------------------------------------------
 c++/src/ByteRLE.cc             | 273 +++++++++++++++++++++++++++++++++++-
 c++/src/ByteRLE.hh             |  48 ++++++-
 c++/src/RLE.cc                 |  20 +++
 c++/src/RLE.hh                 |  52 ++++++-
 c++/src/RLEv1.cc               | 165 ++++++++++++++++++++++
 c++/src/RLEv1.hh               |  57 +++++++-
 c++/test/CMakeLists.txt        |   2 +
 c++/test/TestByteRLEEncoder.cc | 234 +++++++++++++++++++++++++++++++
 c++/test/TestRLEv1Encoder.cc   | 246 ++++++++++++++++++++++++++++++++
 9 files changed, 1091 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/src/ByteRLE.cc
----------------------------------------------------------------------
diff --git a/c++/src/ByteRLE.cc b/c++/src/ByteRLE.cc
index c34af73..c157792 100644
--- a/c++/src/ByteRLE.cc
+++ b/c++/src/ByteRLE.cc
@@ -27,6 +27,273 @@
 namespace orc {
 
   const size_t MINIMUM_REPEAT = 3;
+  const size_t MAXIMUM_REPEAT = 127 + MINIMUM_REPEAT;
+  const size_t MAX_LITERAL_SIZE = 128;
+
+  ByteRleEncoder::~ByteRleEncoder() {
+    // PASS
+  }
+
+  class ByteRleEncoderImpl : public ByteRleEncoder {
+  public:
+    ByteRleEncoderImpl(std::unique_ptr<BufferedOutputStream> output);
+    virtual ~ByteRleEncoderImpl();
+
+    /**
+     * Encode the next batch of values
+     * @param data to be encoded
+     * @param numValues the number of values to be encoded
+     * @param notNull If the pointer is null, all values are read. If the
+     *    pointer is not null, positions that are false are skipped.
+     */
+    virtual void add(const char* data, uint64_t numValues,
+                      const char* notNull) override;
+
+    /**
+     * Get size of buffer used so far.
+     */
+    virtual uint64_t getBufferSize() const override;
+
+    /**
+     * Flushing underlying BufferedOutputStream
+    */
+    virtual uint64_t flush() override;
+
+    virtual void recordPosition(PositionRecorder* recorder) const override;
+
+  protected:
+    std::unique_ptr<BufferedOutputStream> outputStream;
+    char* literals;
+    int numLiterals;
+    bool repeat;
+    int tailRunLength;
+    int bufferPosition;
+    int bufferLength;
+    char* buffer;
+
+    void writeByte(char c);
+    void writeValues();
+    void write(char c);
+  };
+
+  ByteRleEncoderImpl::ByteRleEncoderImpl(
+                                std::unique_ptr<BufferedOutputStream> output)
+                                  : outputStream(std::move(output)) {
+    literals = new char[MAX_LITERAL_SIZE];
+    numLiterals = 0;
+    tailRunLength = 0;
+    repeat = false;
+    bufferPosition = 0;
+    bufferLength = 0;
+    buffer = nullptr;
+  }
+
+  ByteRleEncoderImpl::~ByteRleEncoderImpl() {
+    // PASS
+    delete [] literals;
+  }
+
+  void ByteRleEncoderImpl::writeByte(char c) {
+    if (bufferPosition == bufferLength) {
+      int addedSize = 0;
+      if (!outputStream->Next(reinterpret_cast<void **>(&buffer), &addedSize)) {
+        throw std::bad_alloc();
+      }
+      bufferPosition = 0;
+      bufferLength = addedSize;
+    }
+    buffer[bufferPosition++] = c;
+  }
+
+  void ByteRleEncoderImpl::add(
+                               const char* data,
+                               uint64_t numValues,
+                               const char* notNull) {
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        write(data[i]);
+      }
+    }
+  }
+
+  void ByteRleEncoderImpl::writeValues() {
+    if (numLiterals != 0) {
+      if (repeat) {
+        writeByte(
+            static_cast<char>(numLiterals - static_cast<int>(MINIMUM_REPEAT)));
+        writeByte(literals[0]);
+     } else {
+        writeByte(static_cast<char>(-numLiterals));
+        for (int i = 0; i < numLiterals; ++i) {
+          writeByte(literals[i]);
+        }
+      }
+      repeat = false;
+      tailRunLength = 0;
+      numLiterals = 0;
+    }
+  }
+
+  uint64_t ByteRleEncoderImpl::flush() {
+    writeValues();
+    outputStream->BackUp(bufferLength - bufferPosition);
+    uint64_t dataSize = outputStream->flush();
+    bufferLength = bufferPosition = 0;
+    return dataSize;
+  }
+
+  void ByteRleEncoderImpl::write(char value) {
+    if (numLiterals == 0) {
+      literals[numLiterals++] = value;
+      tailRunLength = 1;
+    } else if (repeat) {
+      if (value == literals[0]) {
+        numLiterals += 1;
+        if (numLiterals == MAXIMUM_REPEAT) {
+          writeValues();
+        }
+      } else {
+        writeValues();
+        literals[numLiterals++] = value;
+        tailRunLength = 1;
+      }
+    } else {
+      if (value == literals[numLiterals - 1]) {
+        tailRunLength += 1;
+      } else {
+        tailRunLength = 1;
+      }
+      if (tailRunLength == MINIMUM_REPEAT) {
+        if (numLiterals + 1 == MINIMUM_REPEAT) {
+          repeat = true;
+          numLiterals += 1;
+        } else {
+          numLiterals -= static_cast<int>(MINIMUM_REPEAT - 1);
+          writeValues();
+          literals[0] = value;
+          repeat = true;
+          numLiterals = MINIMUM_REPEAT;
+        }
+      } else {
+        literals[numLiterals++] = value;
+        if (numLiterals == MAX_LITERAL_SIZE) {
+          writeValues();
+        }
+      }
+    }
+  }
+
+  uint64_t ByteRleEncoderImpl::getBufferSize() const {
+    return outputStream->getSize();
+  }
+
+  void ByteRleEncoderImpl::recordPosition(PositionRecorder *recorder) const {
+    uint64_t flushedSize = outputStream->getSize();
+    uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition);
+    if (outputStream->isCompressed()) {
+      // start of the compression chunk in the stream
+      recorder->add(flushedSize);
+      // number of decompressed bytes that need to be consumed
+      recorder->add(unflushedSize);
+    } else {
+      flushedSize -= static_cast<uint64_t>(bufferLength);
+      // byte offset of the RLE run’s start location
+      recorder->add(flushedSize + unflushedSize);
+    }
+    recorder->add(static_cast<uint64_t>(numLiterals));
+  }
+
+  std::unique_ptr<ByteRleEncoder> createByteRleEncoder
+                              (std::unique_ptr<BufferedOutputStream> output) {
+    return std::unique_ptr<ByteRleEncoder>(new ByteRleEncoderImpl
+                                           (std::move(output)));
+  }
+
+  class BooleanRleEncoderImpl : public ByteRleEncoderImpl {
+  public:
+    BooleanRleEncoderImpl(std::unique_ptr<BufferedOutputStream> output);
+    virtual ~BooleanRleEncoderImpl();
+
+    /**
+     * Encode the next batch of values
+     * @param data to be encoded
+     * @param numValues the number of values to be encoded
+     * @param notNull If the pointer is null, all values are read. If the
+     *    pointer is not null, positions that are false are skipped.
+     */
+    virtual void add(const char* data, uint64_t numValues,
+                      const char* notNull) override;
+
+    /**
+     * Flushing underlying BufferedOutputStream
+     */
+    virtual uint64_t flush() override;
+
+    virtual void recordPosition(PositionRecorder* recorder) const override;
+
+  private:
+    int bitsRemained;
+    char current;
+
+  };
+
+  BooleanRleEncoderImpl::BooleanRleEncoderImpl(
+                        std::unique_ptr<BufferedOutputStream> output)
+                        : ByteRleEncoderImpl(std::move(output)) {
+    bitsRemained = 8;
+    current = static_cast<char>(0);
+  }
+
+  BooleanRleEncoderImpl::~BooleanRleEncoderImpl() {
+    // PASS
+  }
+
+  void BooleanRleEncoderImpl::add(
+                                  const char* data,
+                                  uint64_t numValues,
+                                  const char* notNull) {
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (bitsRemained == 0) {
+        write(current);
+        current = static_cast<char>(0);
+        bitsRemained = 8;
+      }
+      if (!notNull || notNull[i]) {
+        if (!data || data[i]) {
+          current =
+            static_cast<char>(current | (0x80 >> (8 - bitsRemained)));
+        }
+        --bitsRemained;
+      }
+    }
+    if (bitsRemained == 0) {
+      write(current);
+      current = static_cast<char>(0);
+      bitsRemained = 8;
+    }
+  }
+
+  uint64_t BooleanRleEncoderImpl::flush() {
+    if (bitsRemained != 8) {
+      write(current);
+    }
+    bitsRemained = 8;
+    current = static_cast<char>(0);
+    return ByteRleEncoderImpl::flush();
+  }
+
+  void BooleanRleEncoderImpl::recordPosition(PositionRecorder* recorder) const {
+    ByteRleEncoderImpl::recordPosition(recorder);
+    recorder->add(static_cast<uint64_t>(8 - bitsRemained));
+  }
+
+  std::unique_ptr<ByteRleEncoder> createBooleanRleEncoder
+                                (std::unique_ptr<BufferedOutputStream> output) {
+    BooleanRleEncoderImpl* encoder =
+      new BooleanRleEncoderImpl(std::move(output)) ;
+    return std::unique_ptr<ByteRleEncoder>(
+                                    reinterpret_cast<ByteRleEncoder*>(encoder));
+  }
 
   ByteRleDecoder::~ByteRleDecoder() {
     // PASS
@@ -346,7 +613,9 @@ namespace orc {
 
   std::unique_ptr<ByteRleDecoder> createBooleanRleDecoder
                                  (std::unique_ptr<SeekableInputStream> input) {
-    BooleanRleDecoderImpl* decoder = new BooleanRleDecoderImpl(std::move(input)) ;
-    return std::unique_ptr<ByteRleDecoder>(reinterpret_cast<ByteRleDecoder*>(decoder));
+    BooleanRleDecoderImpl* decoder =
+      new BooleanRleDecoderImpl(std::move(input));
+    return std::unique_ptr<ByteRleDecoder>(
+                                    reinterpret_cast<ByteRleDecoder*>(decoder));
   }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/src/ByteRLE.hh
----------------------------------------------------------------------
diff --git a/c++/src/ByteRLE.hh b/c++/src/ByteRLE.hh
index 6762cb5..71ca579 100644
--- a/c++/src/ByteRLE.hh
+++ b/c++/src/ByteRLE.hh
@@ -21,10 +21,42 @@
 
 #include <memory>
 
-#include "Compression.hh"
+#include "io/InputStream.hh"
+#include "io/OutputStream.hh"
 
 namespace orc {
 
+  class ByteRleEncoder {
+  public:
+    virtual ~ByteRleEncoder();
+
+    /**
+     * Encode the next batch of values
+     * @param data to be encoded
+     * @param numValues the number of values to be encoded
+     * @param notNull If the pointer is null, all values are read. If the
+     *    pointer is not null, positions that are false are skipped.
+     */
+    virtual void add(const char* data, uint64_t numValues,
+                      const char* notNull) = 0;
+
+    /**
+     * Get size of buffer used so far.
+     */
+    virtual uint64_t getBufferSize() const = 0;
+
+    /**
+     * Flushing underlying output stream
+     */
+    virtual uint64_t flush() = 0;
+
+    /**
+     * record current position
+     * @param recorder use the recorder to record current positions
+     */
+    virtual void recordPosition(PositionRecorder* recorder) const = 0;
+  };
+
   class ByteRleDecoder {
   public:
     virtual ~ByteRleDecoder();
@@ -50,6 +82,20 @@ namespace orc {
   };
 
   /**
+   * Create a byte RLE encoder.
+   * @param output the output stream to write to
+   */
+  std::unique_ptr<ByteRleEncoder> createByteRleEncoder
+                                 (std::unique_ptr<BufferedOutputStream> output);
+
+  /**
+   * Create a boolean RLE encoder.
+   * @param output the output stream to write to
+   */
+  std::unique_ptr<ByteRleEncoder> createBooleanRleEncoder
+                                 (std::unique_ptr<BufferedOutputStream> output);
+
+  /**
    * Create a byte RLE decoder.
    * @param input the input stream to read from
    */

http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/src/RLE.cc
----------------------------------------------------------------------
diff --git a/c++/src/RLE.cc b/c++/src/RLE.cc
index 51bd628..f4a5402 100644
--- a/c++/src/RLE.cc
+++ b/c++/src/RLE.cc
@@ -22,10 +22,30 @@
 
 namespace orc {
 
+  RleEncoder::~RleEncoder() {
+    // PASS
+  }
+
   RleDecoder::~RleDecoder() {
     // PASS
   }
 
+  std::unique_ptr<RleEncoder> createRleEncoder
+                         (std::unique_ptr<BufferedOutputStream> output,
+                          bool isSigned,
+                          RleVersion version,
+                          MemoryPool&) {
+    switch (static_cast<int64_t>(version)) {
+    case RleVersion_1:
+      // We don't have std::make_unique() yet.
+      return std::unique_ptr<RleEncoder>(new RleEncoderV1(std::move(output),
+                                                          isSigned));
+    case RleVersion_2:
+    default:
+      throw NotImplementedYet("Not implemented yet");
+    }
+  }
+
   std::unique_ptr<RleDecoder> createRleDecoder
                          (std::unique_ptr<SeekableInputStream> input,
                           bool isSigned,

http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/src/RLE.hh
----------------------------------------------------------------------
diff --git a/c++/src/RLE.hh b/c++/src/RLE.hh
index 0a44c95..43f7aa7 100644
--- a/c++/src/RLE.hh
+++ b/c++/src/RLE.hh
@@ -19,16 +19,53 @@
 #ifndef ORC_RLE_HH
 #define ORC_RLE_HH
 
-#include "Compression.hh"
+#include "io/InputStream.hh"
+#include "io/OutputStream.hh"
 
 #include <memory>
 
 namespace orc {
 
+  inline int64_t zigZag(int64_t value) {
+    return (value << 1) ^ (value >> 63);
+  }
+
   inline int64_t unZigZag(uint64_t value) {
     return value >> 1 ^ -(value & 1);
   }
 
+  class RleEncoder {
+  public:
+    // must be non-inline!
+    virtual ~RleEncoder();
+
+    /**
+     * Encode the next batch of values.
+     * @param data the array to read from
+     * @param numValues the number of values to write
+     * @param notNull If the pointer is null, all values are read. If the
+     *    pointer is not null, positions that are false are skipped.
+     */
+    virtual void add(const int64_t* data, uint64_t numValues,
+                      const char* notNull) = 0;
+
+    /**
+     * Get size of buffer used so far.
+     */
+    virtual uint64_t getBufferSize() const = 0;
+
+    /**
+     * Flushing underlying BufferedOutputStream
+     */
+    virtual uint64_t flush() = 0;
+
+    /**
+     * record current position
+     * @param recorder use the recorder to record current positions
+     */
+    virtual void recordPosition(PositionRecorder* recorder) const = 0;
+  };
+
   class RleDecoder {
   public:
     // must be non-inline!
@@ -61,6 +98,19 @@ namespace orc {
   };
 
   /**
+   * Create an RLE encoder.
+   * @param output the output stream to write to
+   * @param isSigned true if the number sequence is signed
+   * @param version version of RLE decoding to do
+   * @param pool memory pool to use for allocation
+   */
+  std::unique_ptr<RleEncoder> createRleEncoder
+                         (std::unique_ptr<BufferedOutputStream> output,
+                          bool isSigned,
+                          RleVersion version,
+                          MemoryPool& pool);
+
+  /**
    * Create an RLE decoder.
    * @param input the input stream to read from
    * @param isSigned true if the number sequence is signed

http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/src/RLEv1.cc
----------------------------------------------------------------------
diff --git a/c++/src/RLEv1.cc b/c++/src/RLEv1.cc
index 4205510..b8b1d72 100644
--- a/c++/src/RLEv1.cc
+++ b/c++/src/RLEv1.cc
@@ -26,8 +26,173 @@
 namespace orc {
 
 const uint64_t MINIMUM_REPEAT = 3;
+const uint64_t MAXIMUM_REPEAT = 127 + MINIMUM_REPEAT;
+
 const uint64_t BASE_128_MASK = 0x7f;
 
+const int MAX_DELTA = 127;
+const int MIN_DELTA = -128;
+const int MAX_LITERAL_SIZE = 128;
+
+RleEncoderV1::RleEncoderV1(
+                          std::unique_ptr<BufferedOutputStream> outStream,
+                          bool hasSigned):
+                          outputStream(std::move(outStream)) {
+  isSigned = hasSigned;
+  literals = new int64_t[MAX_LITERAL_SIZE];
+  numLiterals = 0;
+  delta = 0;
+  repeat = false;
+  tailRunLength = 0;
+  bufferPosition = 0;
+  bufferLength = 0;
+  buffer = nullptr;
+}
+
+RleEncoderV1::~RleEncoderV1() {
+  delete [] literals;
+}
+
+void RleEncoderV1::add(const int64_t* data, uint64_t numValues,
+                          const char* notNull) {
+  for (uint64_t i = 0; i < numValues; ++i) {
+    if (!notNull || notNull[i]) {
+      write(data[i]);
+    }
+  }
+}
+
+void RleEncoderV1::writeByte(char c) {
+  if (bufferPosition == bufferLength) {
+    int addedSize = 0;
+    if (!outputStream->Next(reinterpret_cast<void **>(&buffer), &addedSize)) {
+      throw std::bad_alloc();
+    }
+    bufferPosition = 0;
+    bufferLength = addedSize;
+  }
+  buffer[bufferPosition++] = c;
+}
+
+void RleEncoderV1::writeValues() {
+  if (numLiterals != 0) {
+    if (repeat) {
+      writeByte(static_cast<char>
+                (static_cast<uint64_t>(numLiterals) - MINIMUM_REPEAT));
+      writeByte(static_cast<char>(delta));
+      if (isSigned) {
+        writeVslong(literals[0]);
+      } else {
+        writeVulong(literals[0]);
+      }
+    } else {
+      writeByte(static_cast<char>(-numLiterals));
+      for(int i=0; i < numLiterals; ++i) {
+        if (isSigned) {
+          writeVslong(literals[i]);
+        } else {
+          writeVulong(literals[i]);
+        }
+      }
+    }
+    repeat = false;
+    numLiterals = 0;
+    tailRunLength = 0;
+  }
+}
+
+uint64_t RleEncoderV1::flush() {
+  writeValues();
+  outputStream->BackUp(bufferLength - bufferPosition);
+  uint64_t dataSize = outputStream->flush();
+  bufferLength = bufferPosition = 0;
+  return dataSize;
+}
+
+void RleEncoderV1::write(int64_t value) {
+  if (numLiterals == 0) {
+    literals[numLiterals++] = value;
+    tailRunLength = 1;
+  } else if (repeat) {
+    if (value == literals[0] + delta * numLiterals) {
+      numLiterals += 1;
+      if (numLiterals == MAXIMUM_REPEAT) {
+        writeValues();
+      }
+    } else {
+      writeValues();
+      literals[numLiterals++] = value;
+      tailRunLength = 1;
+    }
+  } else {
+    if (tailRunLength == 1) {
+      delta = value - literals[numLiterals - 1];
+      if (delta < MIN_DELTA || delta > MAX_DELTA) {
+        tailRunLength = 1;
+      } else {
+        tailRunLength = 2;
+      }
+    } else if (value == literals[numLiterals - 1] + delta) {
+      tailRunLength += 1;
+    } else {
+      delta = value - literals[numLiterals - 1];
+      if (delta < MIN_DELTA || delta > MAX_DELTA) {
+        tailRunLength = 1;
+      } else {
+        tailRunLength = 2;
+      }
+    }
+    if (tailRunLength == MINIMUM_REPEAT) {
+      if (numLiterals + 1 == MINIMUM_REPEAT) {
+        repeat = true;
+        numLiterals += 1;
+      } else {
+        numLiterals -= static_cast<int>(MINIMUM_REPEAT - 1);
+        long base = literals[numLiterals];
+        writeValues();
+        literals[0] = base;
+        repeat = true;
+        numLiterals = MINIMUM_REPEAT;
+      }
+    } else {
+      literals[numLiterals++] = value;
+      if (numLiterals == MAX_LITERAL_SIZE) {
+        writeValues();
+      }
+    }
+  }
+}
+
+void RleEncoderV1::writeVslong(int64_t val) {
+  writeVulong((val << 1) ^ (val >> 63));
+}
+
+void RleEncoderV1::writeVulong(int64_t val) {
+  while (true) {
+    if ((val & ~0x7f) == 0) {
+      writeByte(static_cast<char>(val));
+      return;
+    } else {
+      writeByte(static_cast<char>(0x80 | (val & 0x7f)));
+      // cast val to unsigned so as to force 0-fill right shift
+      val = (static_cast<uint64_t>(val) >> 7);
+    }
+  }
+}
+
+void RleEncoderV1::recordPosition(PositionRecorder* recorder) const {
+  uint64_t flushedSize = outputStream->getSize();
+  uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition);
+  if (outputStream->isCompressed()) {
+    recorder->add(flushedSize);
+    recorder->add(unflushedSize);
+  } else {
+    flushedSize -= static_cast<uint64_t>(bufferLength);
+    recorder->add(flushedSize + unflushedSize);
+  }
+  recorder->add(static_cast<uint64_t>(numLiterals));
+}
+
 signed char RleDecoderV1::readByte() {
   if (bufferStart == bufferEnd) {
     int bufferLength;

http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/src/RLEv1.hh
----------------------------------------------------------------------
diff --git a/c++/src/RLEv1.hh b/c++/src/RLEv1.hh
index 95e50a3..6b0855e 100644
--- a/c++/src/RLEv1.hh
+++ b/c++/src/RLEv1.hh
@@ -26,6 +26,59 @@
 
 namespace orc {
 
+class RleEncoderV1 : public RleEncoder {
+public:
+    RleEncoderV1(std::unique_ptr<BufferedOutputStream> outStream,
+                 bool hasSigned);
+    ~RleEncoderV1();
+
+    /**
+     * Encode the next batch of values.
+     * @param data the array to be written
+     * @param numValues the number of values to write
+     * @param notNull If the pointer is null, all values are writen. If the
+     *    pointer is not null, positions that are false are skipped.
+     */
+    void add(const int64_t* data, uint64_t numValues,
+             const char* notNull) override;
+
+    /**
+     * Get size of buffer used so far.
+     */
+    uint64_t getBufferSize() const override {
+        return outputStream->getSize();
+    }
+
+    /**
+     * Flushing underlying BufferedOutputStream
+     */
+    uint64_t flush() override;
+
+    /**
+     * record current position
+     * @param recorder use the recorder to record current positions
+     */
+    virtual void recordPosition(PositionRecorder* recorder) const override;
+
+private:
+    std::unique_ptr<BufferedOutputStream> outputStream;
+    bool isSigned;
+    int64_t* literals;
+    int numLiterals;
+    int64_t delta;
+    bool repeat;
+    int tailRunLength;
+    int bufferPosition;
+    int bufferLength;
+    char* buffer;
+
+    void write(int64_t val);
+    void writeByte(char c);
+    void writeVulong(int64_t val);
+    void writeVslong(int64_t val);
+    void writeValues();
+};
+
 class RleDecoderV1 : public RleDecoder {
 public:
     RleDecoderV1(std::unique_ptr<SeekableInputStream> input,
@@ -60,8 +113,8 @@ private:
     const bool isSigned;
     uint64_t remainingValues;
     int64_t value;
-    const char *bufferStart;
-    const char *bufferEnd;
+    const char* bufferStart;
+    const char* bufferEnd;
     int64_t delta;
     bool repeating;
 };

http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/c++/test/CMakeLists.txt b/c++/test/CMakeLists.txt
index 57a0b9f..f53e553 100644
--- a/c++/test/CMakeLists.txt
+++ b/c++/test/CMakeLists.txt
@@ -27,6 +27,7 @@ add_executable (orc-test
   MemoryOutputStream.cc
   TestBufferedOutputStream.cc
   TestByteRle.cc
+  TestByteRLEEncoder.cc
   TestColumnPrinter.cc
   TestColumnReader.cc
   TestCompression.cc
@@ -35,6 +36,7 @@ add_executable (orc-test
   TestInt128.cc
   TestReader.cc
   TestRle.cc
+  TestRLEv1Encoder.cc
   TestStripeIndexStatistics.cc
   TestTimestampStatistics.cc
   TestTimezone.cc

http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/test/TestByteRLEEncoder.cc
----------------------------------------------------------------------
diff --git a/c++/test/TestByteRLEEncoder.cc b/c++/test/TestByteRLEEncoder.cc
new file mode 100644
index 0000000..c5db559
--- /dev/null
+++ b/c++/test/TestByteRLEEncoder.cc
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ByteRLE.hh"
+#include "MemoryOutputStream.hh"
+
+#include "wrap/gtest-wrapper.h"
+#include "wrap/orc-proto-wrapper.hh"
+
+#include <cstdlib>
+
+namespace orc {
+
+  const int DEFAULT_MEM_STREAM_SIZE = 1024 * 1024; // 1M
+
+  void generateNotNull(uint64_t numValues,
+                       uint64_t numNulls,
+                       char* notNull) {
+    if (numNulls != 0 && notNull != nullptr) {
+      memset(notNull, 1, numValues);
+      while (numNulls > 0) {
+        uint64_t pos = static_cast<uint64_t>(std::rand()) % numValues;
+        if (notNull[pos]) {
+          notNull[pos] = static_cast<char>(0);
+          --numNulls;
+        }
+      }
+    }
+  }
+
+  void generateData(uint64_t numValues,
+                      char* data,
+                      uint64_t numNulls = 0,
+                      char* notNull = nullptr) {
+    generateNotNull(numValues, numNulls, notNull);
+    for (uint64_t i = 0; i < numValues; ++i) {
+        data[i] = static_cast<char>(std::rand() % 256);
+    }
+  }
+
+  void generateBoolData(uint64_t numValues,
+                          char* data,
+                          uint64_t numNulls = 0,
+                          char* notNull = nullptr) {
+    generateNotNull(numValues, numNulls, notNull);
+    for (uint64_t i = 0; i < numValues; ++i) {
+        data[i] = static_cast<char>(std::rand() % 2);
+    }
+  }
+
+  void decodeAndVerify(
+                       const MemoryOutputStream& memStream,
+                       char * data,
+                       uint64_t numValues,
+                       char* notNull) {
+
+    std::unique_ptr<SeekableInputStream> inStream(
+      new SeekableArrayInputStream(memStream.getData(), memStream.getLength()));
+
+    std::unique_ptr<ByteRleDecoder> decoder =
+      createByteRleDecoder(std::move(inStream));
+
+    char* decodedData = new char[numValues];
+    decoder->next(decodedData, numValues, notNull);
+
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        EXPECT_EQ(data[i], decodedData[i]);
+      }
+    }
+
+    delete [] decodedData;
+  }
+
+  void decodeAndVerifyBoolean(
+                       const MemoryOutputStream& memStream,
+                       char * data,
+                       uint64_t numValues,
+                       char* notNull) {
+
+    std::unique_ptr<SeekableInputStream> inStream(
+      new SeekableArrayInputStream(memStream.getData(), memStream.getLength()));
+
+    std::unique_ptr<ByteRleDecoder> decoder =
+      createBooleanRleDecoder(std::move(inStream));
+
+    char* decodedData = new char[numValues];
+    decoder->next(decodedData, numValues, notNull);
+
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        bool expect = data[i] != 0;
+        bool actual = decodedData[i] != 0;
+        EXPECT_EQ(expect, actual);
+      }
+    }
+
+    delete [] decodedData;
+  }
+
+  TEST(ByteRleEncoder, random_chars) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 500 * 1024;
+    uint64_t block = 1024;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    std::unique_ptr<BufferedOutputStream> outStream(
+        new BufferedOutputStream(*pool, &memStream, capacity, block));
+
+    std::unique_ptr<ByteRleEncoder> encoder =
+      createByteRleEncoder(std::move(outStream));
+
+    char* data = new char[102400];
+    generateData(102400, data);
+    encoder->add(data, 102400, nullptr);
+    encoder->flush();
+
+    decodeAndVerify(memStream, data, 102400, nullptr);
+    delete [] data;
+  }
+
+  TEST(ByteRleEncoder, random_chars_with_null) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 500 * 1024;
+    uint64_t block = 1024;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    std::unique_ptr<BufferedOutputStream> outStream(
+        new BufferedOutputStream(*pool, &memStream, capacity, block));
+
+    std::unique_ptr<ByteRleEncoder> encoder =
+      createByteRleEncoder(std::move(outStream));
+
+    char* notNull = new char[102400];
+    char* data = new char[102400];
+    generateData(102400, data, 377, notNull);
+    encoder->add(data, 102400, notNull);
+    encoder->flush();
+
+    decodeAndVerify(memStream, data, 102400, notNull);
+    delete [] data;
+    delete [] notNull;
+  }
+
+  TEST(BooleanRleEncoder, random_bits_not_aligned) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 500 * 1024;
+    uint64_t block = 1024;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    std::unique_ptr<BufferedOutputStream> outStream(
+        new BufferedOutputStream(*pool, &memStream, capacity, block));
+
+    std::unique_ptr<ByteRleEncoder> encoder =
+      createBooleanRleEncoder(std::move(outStream));
+
+    char* data = new char[1779];
+    generateBoolData(1779, data);
+    encoder->add(data, 1779, nullptr);
+    encoder->flush();
+
+    decodeAndVerifyBoolean(memStream, data, 1779, nullptr);
+    delete [] data;
+  }
+
+  TEST(BooleanRleEncoder, random_bits_aligned) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 500 * 1024;
+    uint64_t block = 1024;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    std::unique_ptr<BufferedOutputStream> outStream(
+        new BufferedOutputStream(*pool, &memStream, capacity, block));
+
+    std::unique_ptr<ByteRleEncoder> encoder =
+      createBooleanRleEncoder(std::move(outStream));
+
+    char* data = new char[8000];
+    generateBoolData(8000, data);
+    encoder->add(data, 8000, nullptr);
+    encoder->flush();
+
+    decodeAndVerifyBoolean(memStream, data, 8000, nullptr);
+    delete [] data;
+  }
+
+  TEST(BooleanRleEncoder, random_bits_aligned_with_null) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 500 * 1024;
+    uint64_t block = 1024;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    std::unique_ptr<BufferedOutputStream> outStream(
+        new BufferedOutputStream(*pool, &memStream, capacity, block));
+
+    std::unique_ptr<ByteRleEncoder> encoder =
+      createBooleanRleEncoder(std::move(outStream));
+
+    char* notNull = new char[8000];
+    char* data = new char[8000];
+    generateBoolData(8000, data, 515, notNull);
+    encoder->add(data, 8000, notNull);
+    encoder->flush();
+
+    decodeAndVerifyBoolean(memStream, data, 8000, notNull);
+    delete [] data;
+    delete [] notNull;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/test/TestRLEv1Encoder.cc
----------------------------------------------------------------------
diff --git a/c++/test/TestRLEv1Encoder.cc b/c++/test/TestRLEv1Encoder.cc
new file mode 100644
index 0000000..9762a5c
--- /dev/null
+++ b/c++/test/TestRLEv1Encoder.cc
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstdlib>
+
+#include "MemoryOutputStream.hh"
+#include "RLEv1.hh"
+
+#include "wrap/orc-proto-wrapper.hh"
+#include "wrap/gtest-wrapper.h"
+
+namespace orc {
+
+  const int DEFAULT_MEM_STREAM_SIZE = 1024 * 1024; // 1M
+
+  void generateData(
+                         uint64_t numValues,
+                         int64_t start,
+                         int64_t delta,
+                         bool random,
+                         int64_t* data,
+                         uint64_t numNulls = 0,
+                         char* notNull = nullptr) {
+    if (numNulls != 0 && notNull != nullptr) {
+      memset(notNull, 1, numValues);
+      while (numNulls > 0) {
+        uint64_t pos = static_cast<uint64_t>(std::rand()) % numValues;
+        if (notNull[pos]) {
+          notNull[pos] = static_cast<char>(0);
+          --numNulls;
+        }
+      }
+    }
+
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (notNull == nullptr || notNull[i])
+      {
+        if (!random) {
+          data[i] = start + delta * static_cast<int64_t>(i);
+        } else {
+          data[i] = std::rand();
+        }
+      }
+    }
+  }
+
+  void decodeAndVerify(
+                       const MemoryOutputStream& memStream,
+                       int64_t * data,
+                       uint64_t numValues,
+                       const char* notNull,
+                       bool isSinged) {
+    RleDecoderV1 decoder(
+      std::unique_ptr<SeekableInputStream>(
+        new SeekableArrayInputStream(
+                                    memStream.getData(),
+                                    memStream.getLength())),
+        isSinged);
+
+    int64_t* decodedData = new int64_t[numValues];
+    decoder.next(decodedData, numValues, notNull);
+
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        EXPECT_EQ(data[i], decodedData[i]);
+      }
+    }
+
+    delete [] decodedData;
+  }
+
+  TEST(RleEncoderV1, delta_increasing_sequance_unsigned) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 500 * 1024;
+    uint64_t block = 1024;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    RleEncoderV1 encoder(
+      std::unique_ptr<BufferedOutputStream>(
+              new BufferedOutputStream(*pool, &memStream, capacity, block)),
+          false);
+
+    int64_t* data = new int64_t[1024];
+    generateData(1024, 0, 1, false, data);
+    encoder.add(data, 1024, nullptr);
+    encoder.flush();
+
+    decodeAndVerify(memStream, data, 1024, nullptr, false);
+    delete [] data;
+  }
+
+  TEST(RleEncoderV1, delta_increasing_sequance_unsigned_null) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 500 * 1024;
+    uint64_t block = 1024;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    RleEncoderV1 encoder(
+      std::unique_ptr<BufferedOutputStream>(
+              new BufferedOutputStream(*pool, &memStream, capacity, block)),
+          false);
+
+    char* notNull = new char[1024];
+    int64_t* data = new int64_t[1024];
+    generateData(1024, 0, 1, false, data, 100, notNull);
+    encoder.add(data, 1024, notNull);
+    encoder.flush();
+
+    decodeAndVerify(memStream, data, 1024, notNull, false);
+    delete [] data;
+    delete [] notNull;
+  }
+
+  TEST(RleEncoderV1, delta_decreasing_sequance_unsigned) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 500 * 1024;
+    uint64_t block = 1024;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    RleEncoderV1 encoder(
+      std::unique_ptr<BufferedOutputStream>(
+              new BufferedOutputStream(*pool, &memStream, capacity, block)),
+          false);
+
+    int64_t* data = new int64_t[1024];
+    generateData(1024, 5000, -3, false, data);
+    encoder.add(data, 1024, nullptr);
+    encoder.flush();
+
+    decodeAndVerify(memStream, data, 1024, nullptr, false);
+    delete [] data;
+  }
+
+  TEST(RleEncoderV1, delta_decreasing_sequance_signed) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 500 * 1024;
+    uint64_t block = 1024;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    RleEncoderV1 encoder(
+      std::unique_ptr<BufferedOutputStream>(
+              new BufferedOutputStream(*pool, &memStream, capacity, block)),
+          true);
+
+    int64_t* data = new int64_t[1024];
+    generateData(1024, 100, -3, false, data);
+    encoder.add(data, 1024, nullptr);
+    encoder.flush();
+
+    decodeAndVerify(memStream, data, 1024, nullptr, true);
+    delete [] data;
+  }
+
+  TEST(RleEncoderV1, delta_decreasing_sequance_signed_null) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 500 * 1024;
+    uint64_t block = 1024;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    RleEncoderV1 encoder(
+      std::unique_ptr<BufferedOutputStream>(
+              new BufferedOutputStream(*pool, &memStream, capacity, block)),
+          true);
+
+    char* notNull = new char[1024];
+    int64_t* data = new int64_t[1024];
+    generateData(1024, 100, -3, false, data, 500, notNull);
+    encoder.add(data, 1024, notNull);
+    encoder.flush();
+
+    decodeAndVerify(memStream, data, 1024, notNull, true);
+    delete [] data;
+    delete [] notNull;
+  }
+
+  TEST(RleEncoderV1, random_sequance_signed) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 500 * 1024;
+    uint64_t block = 1024;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    RleEncoderV1 encoder(
+      std::unique_ptr<BufferedOutputStream>(
+              new BufferedOutputStream(*pool, &memStream, capacity, block)),
+          true);
+
+    int64_t* data = new int64_t[1024];
+    generateData(1024, 0, 0, true, data);
+    encoder.add(data, 1024, nullptr);
+    encoder.flush();
+
+    decodeAndVerify(memStream, data, 1024, nullptr, true);
+    delete [] data;
+  }
+
+  TEST(RleEncoderV1, all_null) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 500 * 1024;
+    uint64_t block = 1024;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    RleEncoderV1 encoder(
+      std::unique_ptr<BufferedOutputStream>(
+              new BufferedOutputStream(*pool, &memStream, capacity, block)),
+          true);
+
+    char* notNull = new char[1024];
+    int64_t* data = new int64_t[1024];
+    generateData(1024, 100, -3, false, data, 1024, notNull);
+    encoder.add(data, 1024, notNull);
+    encoder.flush();
+
+    decodeAndVerify(memStream, data, 1024, notNull, true);
+    delete [] data;
+    delete [] notNull;
+  }
+}