You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by pr...@apache.org on 2017/05/23 20:00:30 UTC
orc git commit: ORC-191 Implement RLE v1 encoder
Repository: orc
Updated Branches:
refs/heads/master 9c5261005 -> 44c291081
ORC-191 Implement RLE v1 encoder
Fixes #126
Signed-off-by: Prasanth Jayachandran <pr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/44c29108
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/44c29108
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/44c29108
Branch: refs/heads/master
Commit: 44c291081c70a50dcdbcb5e2ba98f71234e01a6e
Parents: 9c52610
Author: Xiening.Dai <xi...@alibaba-inc.com>
Authored: Thu May 18 13:39:06 2017 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue May 23 12:55:35 2017 -0700
----------------------------------------------------------------------
c++/src/ByteRLE.cc | 273 +++++++++++++++++++++++++++++++++++-
c++/src/ByteRLE.hh | 48 ++++++-
c++/src/RLE.cc | 20 +++
c++/src/RLE.hh | 52 ++++++-
c++/src/RLEv1.cc | 165 ++++++++++++++++++++++
c++/src/RLEv1.hh | 57 +++++++-
c++/test/CMakeLists.txt | 2 +
c++/test/TestByteRLEEncoder.cc | 234 +++++++++++++++++++++++++++++++
c++/test/TestRLEv1Encoder.cc | 246 ++++++++++++++++++++++++++++++++
9 files changed, 1091 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/src/ByteRLE.cc
----------------------------------------------------------------------
diff --git a/c++/src/ByteRLE.cc b/c++/src/ByteRLE.cc
index c34af73..c157792 100644
--- a/c++/src/ByteRLE.cc
+++ b/c++/src/ByteRLE.cc
@@ -27,6 +27,273 @@
namespace orc {
const size_t MINIMUM_REPEAT = 3;
+ const size_t MAXIMUM_REPEAT = 127 + MINIMUM_REPEAT;
+ const size_t MAX_LITERAL_SIZE = 128;
+
+ ByteRleEncoder::~ByteRleEncoder() {
+ // PASS
+ }
+
+ class ByteRleEncoderImpl : public ByteRleEncoder {
+ public:
+ ByteRleEncoderImpl(std::unique_ptr<BufferedOutputStream> output);
+ virtual ~ByteRleEncoderImpl();
+
+ /**
+ * Encode the next batch of values
+ * @param data to be encoded
+ * @param numValues the number of values to be encoded
+ * @param notNull If the pointer is null, all values are read. If the
+ * pointer is not null, positions that are false are skipped.
+ */
+ virtual void add(const char* data, uint64_t numValues,
+ const char* notNull) override;
+
+ /**
+ * Get size of buffer used so far.
+ */
+ virtual uint64_t getBufferSize() const override;
+
+ /**
+ * Flushing underlying BufferedOutputStream
+ */
+ virtual uint64_t flush() override;
+
+ virtual void recordPosition(PositionRecorder* recorder) const override;
+
+ protected:
+ std::unique_ptr<BufferedOutputStream> outputStream;
+ char* literals;
+ int numLiterals;
+ bool repeat;
+ int tailRunLength;
+ int bufferPosition;
+ int bufferLength;
+ char* buffer;
+
+ void writeByte(char c);
+ void writeValues();
+ void write(char c);
+ };
+
+ ByteRleEncoderImpl::ByteRleEncoderImpl(
+ std::unique_ptr<BufferedOutputStream> output)
+ : outputStream(std::move(output)) {
+ literals = new char[MAX_LITERAL_SIZE];
+ numLiterals = 0;
+ tailRunLength = 0;
+ repeat = false;
+ bufferPosition = 0;
+ bufferLength = 0;
+ buffer = nullptr;
+ }
+
+ ByteRleEncoderImpl::~ByteRleEncoderImpl() {
+ // PASS
+ delete [] literals;
+ }
+
+ void ByteRleEncoderImpl::writeByte(char c) {
+ if (bufferPosition == bufferLength) {
+ int addedSize = 0;
+ if (!outputStream->Next(reinterpret_cast<void **>(&buffer), &addedSize)) {
+ throw std::bad_alloc();
+ }
+ bufferPosition = 0;
+ bufferLength = addedSize;
+ }
+ buffer[bufferPosition++] = c;
+ }
+
+ void ByteRleEncoderImpl::add(
+ const char* data,
+ uint64_t numValues,
+ const char* notNull) {
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ write(data[i]);
+ }
+ }
+ }
+
+ void ByteRleEncoderImpl::writeValues() {
+ if (numLiterals != 0) {
+ if (repeat) {
+ writeByte(
+ static_cast<char>(numLiterals - static_cast<int>(MINIMUM_REPEAT)));
+ writeByte(literals[0]);
+ } else {
+ writeByte(static_cast<char>(-numLiterals));
+ for (int i = 0; i < numLiterals; ++i) {
+ writeByte(literals[i]);
+ }
+ }
+ repeat = false;
+ tailRunLength = 0;
+ numLiterals = 0;
+ }
+ }
+
+ uint64_t ByteRleEncoderImpl::flush() {
+ writeValues();
+ outputStream->BackUp(bufferLength - bufferPosition);
+ uint64_t dataSize = outputStream->flush();
+ bufferLength = bufferPosition = 0;
+ return dataSize;
+ }
+
+ void ByteRleEncoderImpl::write(char value) {
+ if (numLiterals == 0) {
+ literals[numLiterals++] = value;
+ tailRunLength = 1;
+ } else if (repeat) {
+ if (value == literals[0]) {
+ numLiterals += 1;
+ if (numLiterals == MAXIMUM_REPEAT) {
+ writeValues();
+ }
+ } else {
+ writeValues();
+ literals[numLiterals++] = value;
+ tailRunLength = 1;
+ }
+ } else {
+ if (value == literals[numLiterals - 1]) {
+ tailRunLength += 1;
+ } else {
+ tailRunLength = 1;
+ }
+ if (tailRunLength == MINIMUM_REPEAT) {
+ if (numLiterals + 1 == MINIMUM_REPEAT) {
+ repeat = true;
+ numLiterals += 1;
+ } else {
+ numLiterals -= static_cast<int>(MINIMUM_REPEAT - 1);
+ writeValues();
+ literals[0] = value;
+ repeat = true;
+ numLiterals = MINIMUM_REPEAT;
+ }
+ } else {
+ literals[numLiterals++] = value;
+ if (numLiterals == MAX_LITERAL_SIZE) {
+ writeValues();
+ }
+ }
+ }
+ }
+
+ uint64_t ByteRleEncoderImpl::getBufferSize() const {
+ return outputStream->getSize();
+ }
+
+ void ByteRleEncoderImpl::recordPosition(PositionRecorder *recorder) const {
+ uint64_t flushedSize = outputStream->getSize();
+ uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition);
+ if (outputStream->isCompressed()) {
+ // start of the compression chunk in the stream
+ recorder->add(flushedSize);
+ // number of decompressed bytes that need to be consumed
+ recorder->add(unflushedSize);
+ } else {
+ flushedSize -= static_cast<uint64_t>(bufferLength);
+ // byte offset of the RLE run’s start location
+ recorder->add(flushedSize + unflushedSize);
+ }
+ recorder->add(static_cast<uint64_t>(numLiterals));
+ }
+
+ std::unique_ptr<ByteRleEncoder> createByteRleEncoder
+ (std::unique_ptr<BufferedOutputStream> output) {
+ return std::unique_ptr<ByteRleEncoder>(new ByteRleEncoderImpl
+ (std::move(output)));
+ }
+
+ class BooleanRleEncoderImpl : public ByteRleEncoderImpl {
+ public:
+ BooleanRleEncoderImpl(std::unique_ptr<BufferedOutputStream> output);
+ virtual ~BooleanRleEncoderImpl();
+
+ /**
+ * Encode the next batch of values
+ * @param data to be encoded
+ * @param numValues the number of values to be encoded
+ * @param notNull If the pointer is null, all values are read. If the
+ * pointer is not null, positions that are false are skipped.
+ */
+ virtual void add(const char* data, uint64_t numValues,
+ const char* notNull) override;
+
+ /**
+ * Flushing underlying BufferedOutputStream
+ */
+ virtual uint64_t flush() override;
+
+ virtual void recordPosition(PositionRecorder* recorder) const override;
+
+ private:
+ int bitsRemained;
+ char current;
+
+ };
+
+ BooleanRleEncoderImpl::BooleanRleEncoderImpl(
+ std::unique_ptr<BufferedOutputStream> output)
+ : ByteRleEncoderImpl(std::move(output)) {
+ bitsRemained = 8;
+ current = static_cast<char>(0);
+ }
+
+ BooleanRleEncoderImpl::~BooleanRleEncoderImpl() {
+ // PASS
+ }
+
+ void BooleanRleEncoderImpl::add(
+ const char* data,
+ uint64_t numValues,
+ const char* notNull) {
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (bitsRemained == 0) {
+ write(current);
+ current = static_cast<char>(0);
+ bitsRemained = 8;
+ }
+ if (!notNull || notNull[i]) {
+ if (!data || data[i]) {
+ current =
+ static_cast<char>(current | (0x80 >> (8 - bitsRemained)));
+ }
+ --bitsRemained;
+ }
+ }
+ if (bitsRemained == 0) {
+ write(current);
+ current = static_cast<char>(0);
+ bitsRemained = 8;
+ }
+ }
+
+ uint64_t BooleanRleEncoderImpl::flush() {
+ if (bitsRemained != 8) {
+ write(current);
+ }
+ bitsRemained = 8;
+ current = static_cast<char>(0);
+ return ByteRleEncoderImpl::flush();
+ }
+
+ void BooleanRleEncoderImpl::recordPosition(PositionRecorder* recorder) const {
+ ByteRleEncoderImpl::recordPosition(recorder);
+ recorder->add(static_cast<uint64_t>(8 - bitsRemained));
+ }
+
+ std::unique_ptr<ByteRleEncoder> createBooleanRleEncoder
+ (std::unique_ptr<BufferedOutputStream> output) {
+ BooleanRleEncoderImpl* encoder =
+ new BooleanRleEncoderImpl(std::move(output)) ;
+ return std::unique_ptr<ByteRleEncoder>(
+ reinterpret_cast<ByteRleEncoder*>(encoder));
+ }
ByteRleDecoder::~ByteRleDecoder() {
// PASS
@@ -346,7 +613,9 @@ namespace orc {
std::unique_ptr<ByteRleDecoder> createBooleanRleDecoder
(std::unique_ptr<SeekableInputStream> input) {
- BooleanRleDecoderImpl* decoder = new BooleanRleDecoderImpl(std::move(input)) ;
- return std::unique_ptr<ByteRleDecoder>(reinterpret_cast<ByteRleDecoder*>(decoder));
+ BooleanRleDecoderImpl* decoder =
+ new BooleanRleDecoderImpl(std::move(input));
+ return std::unique_ptr<ByteRleDecoder>(
+ reinterpret_cast<ByteRleDecoder*>(decoder));
}
}
http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/src/ByteRLE.hh
----------------------------------------------------------------------
diff --git a/c++/src/ByteRLE.hh b/c++/src/ByteRLE.hh
index 6762cb5..71ca579 100644
--- a/c++/src/ByteRLE.hh
+++ b/c++/src/ByteRLE.hh
@@ -21,10 +21,42 @@
#include <memory>
-#include "Compression.hh"
+#include "io/InputStream.hh"
+#include "io/OutputStream.hh"
namespace orc {
+ class ByteRleEncoder {
+ public:
+ virtual ~ByteRleEncoder();
+
+ /**
+ * Encode the next batch of values
+ * @param data to be encoded
+ * @param numValues the number of values to be encoded
+ * @param notNull If the pointer is null, all values are read. If the
+ * pointer is not null, positions that are false are skipped.
+ */
+ virtual void add(const char* data, uint64_t numValues,
+ const char* notNull) = 0;
+
+ /**
+ * Get size of buffer used so far.
+ */
+ virtual uint64_t getBufferSize() const = 0;
+
+ /**
+ * Flushing underlying output stream
+ */
+ virtual uint64_t flush() = 0;
+
+ /**
+ * record current position
+ * @param recorder use the recorder to record current positions
+ */
+ virtual void recordPosition(PositionRecorder* recorder) const = 0;
+ };
+
class ByteRleDecoder {
public:
virtual ~ByteRleDecoder();
@@ -50,6 +82,20 @@ namespace orc {
};
/**
+ * Create a byte RLE encoder.
+ * @param output the output stream to write to
+ */
+ std::unique_ptr<ByteRleEncoder> createByteRleEncoder
+ (std::unique_ptr<BufferedOutputStream> output);
+
+ /**
+ * Create a boolean RLE encoder.
+ * @param output the output stream to write to
+ */
+ std::unique_ptr<ByteRleEncoder> createBooleanRleEncoder
+ (std::unique_ptr<BufferedOutputStream> output);
+
+ /**
* Create a byte RLE decoder.
* @param input the input stream to read from
*/
http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/src/RLE.cc
----------------------------------------------------------------------
diff --git a/c++/src/RLE.cc b/c++/src/RLE.cc
index 51bd628..f4a5402 100644
--- a/c++/src/RLE.cc
+++ b/c++/src/RLE.cc
@@ -22,10 +22,30 @@
namespace orc {
+ RleEncoder::~RleEncoder() {
+ // PASS
+ }
+
RleDecoder::~RleDecoder() {
// PASS
}
+ std::unique_ptr<RleEncoder> createRleEncoder
+ (std::unique_ptr<BufferedOutputStream> output,
+ bool isSigned,
+ RleVersion version,
+ MemoryPool&) {
+ switch (static_cast<int64_t>(version)) {
+ case RleVersion_1:
+ // We don't have std::make_unique() yet.
+ return std::unique_ptr<RleEncoder>(new RleEncoderV1(std::move(output),
+ isSigned));
+ case RleVersion_2:
+ default:
+ throw NotImplementedYet("Not implemented yet");
+ }
+ }
+
std::unique_ptr<RleDecoder> createRleDecoder
(std::unique_ptr<SeekableInputStream> input,
bool isSigned,
http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/src/RLE.hh
----------------------------------------------------------------------
diff --git a/c++/src/RLE.hh b/c++/src/RLE.hh
index 0a44c95..43f7aa7 100644
--- a/c++/src/RLE.hh
+++ b/c++/src/RLE.hh
@@ -19,16 +19,53 @@
#ifndef ORC_RLE_HH
#define ORC_RLE_HH
-#include "Compression.hh"
+#include "io/InputStream.hh"
+#include "io/OutputStream.hh"
#include <memory>
namespace orc {
+ inline int64_t zigZag(int64_t value) {
+ return (value << 1) ^ (value >> 63);
+ }
+
inline int64_t unZigZag(uint64_t value) {
return value >> 1 ^ -(value & 1);
}
+ class RleEncoder {
+ public:
+ // must be non-inline!
+ virtual ~RleEncoder();
+
+ /**
+ * Encode the next batch of values.
+ * @param data the array to read from
+ * @param numValues the number of values to write
+ * @param notNull If the pointer is null, all values are read. If the
+ * pointer is not null, positions that are false are skipped.
+ */
+ virtual void add(const int64_t* data, uint64_t numValues,
+ const char* notNull) = 0;
+
+ /**
+ * Get size of buffer used so far.
+ */
+ virtual uint64_t getBufferSize() const = 0;
+
+ /**
+ * Flushing underlying BufferedOutputStream
+ */
+ virtual uint64_t flush() = 0;
+
+ /**
+ * record current position
+ * @param recorder use the recorder to record current positions
+ */
+ virtual void recordPosition(PositionRecorder* recorder) const = 0;
+ };
+
class RleDecoder {
public:
// must be non-inline!
@@ -61,6 +98,19 @@ namespace orc {
};
/**
+ * Create an RLE encoder.
+ * @param output the output stream to write to
+ * @param isSigned true if the number sequence is signed
+ * @param version version of RLE decoding to do
+ * @param pool memory pool to use for allocation
+ */
+ std::unique_ptr<RleEncoder> createRleEncoder
+ (std::unique_ptr<BufferedOutputStream> output,
+ bool isSigned,
+ RleVersion version,
+ MemoryPool& pool);
+
+ /**
* Create an RLE decoder.
* @param input the input stream to read from
* @param isSigned true if the number sequence is signed
http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/src/RLEv1.cc
----------------------------------------------------------------------
diff --git a/c++/src/RLEv1.cc b/c++/src/RLEv1.cc
index 4205510..b8b1d72 100644
--- a/c++/src/RLEv1.cc
+++ b/c++/src/RLEv1.cc
@@ -26,8 +26,173 @@
namespace orc {
const uint64_t MINIMUM_REPEAT = 3;
+const uint64_t MAXIMUM_REPEAT = 127 + MINIMUM_REPEAT;
+
const uint64_t BASE_128_MASK = 0x7f;
+const int MAX_DELTA = 127;
+const int MIN_DELTA = -128;
+const int MAX_LITERAL_SIZE = 128;
+
+RleEncoderV1::RleEncoderV1(
+ std::unique_ptr<BufferedOutputStream> outStream,
+ bool hasSigned):
+ outputStream(std::move(outStream)) {
+ isSigned = hasSigned;
+ literals = new int64_t[MAX_LITERAL_SIZE];
+ numLiterals = 0;
+ delta = 0;
+ repeat = false;
+ tailRunLength = 0;
+ bufferPosition = 0;
+ bufferLength = 0;
+ buffer = nullptr;
+}
+
+RleEncoderV1::~RleEncoderV1() {
+ delete [] literals;
+}
+
+void RleEncoderV1::add(const int64_t* data, uint64_t numValues,
+ const char* notNull) {
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ write(data[i]);
+ }
+ }
+}
+
+void RleEncoderV1::writeByte(char c) {
+ if (bufferPosition == bufferLength) {
+ int addedSize = 0;
+ if (!outputStream->Next(reinterpret_cast<void **>(&buffer), &addedSize)) {
+ throw std::bad_alloc();
+ }
+ bufferPosition = 0;
+ bufferLength = addedSize;
+ }
+ buffer[bufferPosition++] = c;
+}
+
+void RleEncoderV1::writeValues() {
+ if (numLiterals != 0) {
+ if (repeat) {
+ writeByte(static_cast<char>
+ (static_cast<uint64_t>(numLiterals) - MINIMUM_REPEAT));
+ writeByte(static_cast<char>(delta));
+ if (isSigned) {
+ writeVslong(literals[0]);
+ } else {
+ writeVulong(literals[0]);
+ }
+ } else {
+ writeByte(static_cast<char>(-numLiterals));
+ for(int i=0; i < numLiterals; ++i) {
+ if (isSigned) {
+ writeVslong(literals[i]);
+ } else {
+ writeVulong(literals[i]);
+ }
+ }
+ }
+ repeat = false;
+ numLiterals = 0;
+ tailRunLength = 0;
+ }
+}
+
+uint64_t RleEncoderV1::flush() {
+ writeValues();
+ outputStream->BackUp(bufferLength - bufferPosition);
+ uint64_t dataSize = outputStream->flush();
+ bufferLength = bufferPosition = 0;
+ return dataSize;
+}
+
+void RleEncoderV1::write(int64_t value) {
+ if (numLiterals == 0) {
+ literals[numLiterals++] = value;
+ tailRunLength = 1;
+ } else if (repeat) {
+ if (value == literals[0] + delta * numLiterals) {
+ numLiterals += 1;
+ if (numLiterals == MAXIMUM_REPEAT) {
+ writeValues();
+ }
+ } else {
+ writeValues();
+ literals[numLiterals++] = value;
+ tailRunLength = 1;
+ }
+ } else {
+ if (tailRunLength == 1) {
+ delta = value - literals[numLiterals - 1];
+ if (delta < MIN_DELTA || delta > MAX_DELTA) {
+ tailRunLength = 1;
+ } else {
+ tailRunLength = 2;
+ }
+ } else if (value == literals[numLiterals - 1] + delta) {
+ tailRunLength += 1;
+ } else {
+ delta = value - literals[numLiterals - 1];
+ if (delta < MIN_DELTA || delta > MAX_DELTA) {
+ tailRunLength = 1;
+ } else {
+ tailRunLength = 2;
+ }
+ }
+ if (tailRunLength == MINIMUM_REPEAT) {
+ if (numLiterals + 1 == MINIMUM_REPEAT) {
+ repeat = true;
+ numLiterals += 1;
+ } else {
+ numLiterals -= static_cast<int>(MINIMUM_REPEAT - 1);
+ long base = literals[numLiterals];
+ writeValues();
+ literals[0] = base;
+ repeat = true;
+ numLiterals = MINIMUM_REPEAT;
+ }
+ } else {
+ literals[numLiterals++] = value;
+ if (numLiterals == MAX_LITERAL_SIZE) {
+ writeValues();
+ }
+ }
+ }
+}
+
+void RleEncoderV1::writeVslong(int64_t val) {
+ writeVulong((val << 1) ^ (val >> 63));
+}
+
+void RleEncoderV1::writeVulong(int64_t val) {
+ while (true) {
+ if ((val & ~0x7f) == 0) {
+ writeByte(static_cast<char>(val));
+ return;
+ } else {
+ writeByte(static_cast<char>(0x80 | (val & 0x7f)));
+ // cast val to unsigned so as to force 0-fill right shift
+ val = (static_cast<uint64_t>(val) >> 7);
+ }
+ }
+}
+
+void RleEncoderV1::recordPosition(PositionRecorder* recorder) const {
+ uint64_t flushedSize = outputStream->getSize();
+ uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition);
+ if (outputStream->isCompressed()) {
+ recorder->add(flushedSize);
+ recorder->add(unflushedSize);
+ } else {
+ flushedSize -= static_cast<uint64_t>(bufferLength);
+ recorder->add(flushedSize + unflushedSize);
+ }
+ recorder->add(static_cast<uint64_t>(numLiterals));
+}
+
signed char RleDecoderV1::readByte() {
if (bufferStart == bufferEnd) {
int bufferLength;
http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/src/RLEv1.hh
----------------------------------------------------------------------
diff --git a/c++/src/RLEv1.hh b/c++/src/RLEv1.hh
index 95e50a3..6b0855e 100644
--- a/c++/src/RLEv1.hh
+++ b/c++/src/RLEv1.hh
@@ -26,6 +26,59 @@
namespace orc {
+class RleEncoderV1 : public RleEncoder {
+public:
+ RleEncoderV1(std::unique_ptr<BufferedOutputStream> outStream,
+ bool hasSigned);
+ ~RleEncoderV1();
+
+ /**
+ * Encode the next batch of values.
+ * @param data the array to be written
+ * @param numValues the number of values to write
+ * @param notNull If the pointer is null, all values are writen. If the
+ * pointer is not null, positions that are false are skipped.
+ */
+ void add(const int64_t* data, uint64_t numValues,
+ const char* notNull) override;
+
+ /**
+ * Get size of buffer used so far.
+ */
+ uint64_t getBufferSize() const override {
+ return outputStream->getSize();
+ }
+
+ /**
+ * Flushing underlying BufferedOutputStream
+ */
+ uint64_t flush() override;
+
+ /**
+ * record current position
+ * @param recorder use the recorder to record current positions
+ */
+ virtual void recordPosition(PositionRecorder* recorder) const override;
+
+private:
+ std::unique_ptr<BufferedOutputStream> outputStream;
+ bool isSigned;
+ int64_t* literals;
+ int numLiterals;
+ int64_t delta;
+ bool repeat;
+ int tailRunLength;
+ int bufferPosition;
+ int bufferLength;
+ char* buffer;
+
+ void write(int64_t val);
+ void writeByte(char c);
+ void writeVulong(int64_t val);
+ void writeVslong(int64_t val);
+ void writeValues();
+};
+
class RleDecoderV1 : public RleDecoder {
public:
RleDecoderV1(std::unique_ptr<SeekableInputStream> input,
@@ -60,8 +113,8 @@ private:
const bool isSigned;
uint64_t remainingValues;
int64_t value;
- const char *bufferStart;
- const char *bufferEnd;
+ const char* bufferStart;
+ const char* bufferEnd;
int64_t delta;
bool repeating;
};
http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/c++/test/CMakeLists.txt b/c++/test/CMakeLists.txt
index 57a0b9f..f53e553 100644
--- a/c++/test/CMakeLists.txt
+++ b/c++/test/CMakeLists.txt
@@ -27,6 +27,7 @@ add_executable (orc-test
MemoryOutputStream.cc
TestBufferedOutputStream.cc
TestByteRle.cc
+ TestByteRLEEncoder.cc
TestColumnPrinter.cc
TestColumnReader.cc
TestCompression.cc
@@ -35,6 +36,7 @@ add_executable (orc-test
TestInt128.cc
TestReader.cc
TestRle.cc
+ TestRLEv1Encoder.cc
TestStripeIndexStatistics.cc
TestTimestampStatistics.cc
TestTimezone.cc
http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/test/TestByteRLEEncoder.cc
----------------------------------------------------------------------
diff --git a/c++/test/TestByteRLEEncoder.cc b/c++/test/TestByteRLEEncoder.cc
new file mode 100644
index 0000000..c5db559
--- /dev/null
+++ b/c++/test/TestByteRLEEncoder.cc
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ByteRLE.hh"
+#include "MemoryOutputStream.hh"
+
+#include "wrap/gtest-wrapper.h"
+#include "wrap/orc-proto-wrapper.hh"
+
+#include <cstdlib>
+
+namespace orc {
+
+ const int DEFAULT_MEM_STREAM_SIZE = 1024 * 1024; // 1M
+
+ void generateNotNull(uint64_t numValues,
+ uint64_t numNulls,
+ char* notNull) {
+ if (numNulls != 0 && notNull != nullptr) {
+ memset(notNull, 1, numValues);
+ while (numNulls > 0) {
+ uint64_t pos = static_cast<uint64_t>(std::rand()) % numValues;
+ if (notNull[pos]) {
+ notNull[pos] = static_cast<char>(0);
+ --numNulls;
+ }
+ }
+ }
+ }
+
+ void generateData(uint64_t numValues,
+ char* data,
+ uint64_t numNulls = 0,
+ char* notNull = nullptr) {
+ generateNotNull(numValues, numNulls, notNull);
+ for (uint64_t i = 0; i < numValues; ++i) {
+ data[i] = static_cast<char>(std::rand() % 256);
+ }
+ }
+
+ void generateBoolData(uint64_t numValues,
+ char* data,
+ uint64_t numNulls = 0,
+ char* notNull = nullptr) {
+ generateNotNull(numValues, numNulls, notNull);
+ for (uint64_t i = 0; i < numValues; ++i) {
+ data[i] = static_cast<char>(std::rand() % 2);
+ }
+ }
+
+ void decodeAndVerify(
+ const MemoryOutputStream& memStream,
+ char * data,
+ uint64_t numValues,
+ char* notNull) {
+
+ std::unique_ptr<SeekableInputStream> inStream(
+ new SeekableArrayInputStream(memStream.getData(), memStream.getLength()));
+
+ std::unique_ptr<ByteRleDecoder> decoder =
+ createByteRleDecoder(std::move(inStream));
+
+ char* decodedData = new char[numValues];
+ decoder->next(decodedData, numValues, notNull);
+
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ EXPECT_EQ(data[i], decodedData[i]);
+ }
+ }
+
+ delete [] decodedData;
+ }
+
+ void decodeAndVerifyBoolean(
+ const MemoryOutputStream& memStream,
+ char * data,
+ uint64_t numValues,
+ char* notNull) {
+
+ std::unique_ptr<SeekableInputStream> inStream(
+ new SeekableArrayInputStream(memStream.getData(), memStream.getLength()));
+
+ std::unique_ptr<ByteRleDecoder> decoder =
+ createBooleanRleDecoder(std::move(inStream));
+
+ char* decodedData = new char[numValues];
+ decoder->next(decodedData, numValues, notNull);
+
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ bool expect = data[i] != 0;
+ bool actual = decodedData[i] != 0;
+ EXPECT_EQ(expect, actual);
+ }
+ }
+
+ delete [] decodedData;
+ }
+
+ TEST(ByteRleEncoder, random_chars) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 500 * 1024;
+ uint64_t block = 1024;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ std::unique_ptr<BufferedOutputStream> outStream(
+ new BufferedOutputStream(*pool, &memStream, capacity, block));
+
+ std::unique_ptr<ByteRleEncoder> encoder =
+ createByteRleEncoder(std::move(outStream));
+
+ char* data = new char[102400];
+ generateData(102400, data);
+ encoder->add(data, 102400, nullptr);
+ encoder->flush();
+
+ decodeAndVerify(memStream, data, 102400, nullptr);
+ delete [] data;
+ }
+
+ TEST(ByteRleEncoder, random_chars_with_null) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 500 * 1024;
+ uint64_t block = 1024;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ std::unique_ptr<BufferedOutputStream> outStream(
+ new BufferedOutputStream(*pool, &memStream, capacity, block));
+
+ std::unique_ptr<ByteRleEncoder> encoder =
+ createByteRleEncoder(std::move(outStream));
+
+ char* notNull = new char[102400];
+ char* data = new char[102400];
+ generateData(102400, data, 377, notNull);
+ encoder->add(data, 102400, notNull);
+ encoder->flush();
+
+ decodeAndVerify(memStream, data, 102400, notNull);
+ delete [] data;
+ delete [] notNull;
+ }
+
+ TEST(BooleanRleEncoder, random_bits_not_aligned) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 500 * 1024;
+ uint64_t block = 1024;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ std::unique_ptr<BufferedOutputStream> outStream(
+ new BufferedOutputStream(*pool, &memStream, capacity, block));
+
+ std::unique_ptr<ByteRleEncoder> encoder =
+ createBooleanRleEncoder(std::move(outStream));
+
+ char* data = new char[1779];
+ generateBoolData(1779, data);
+ encoder->add(data, 1779, nullptr);
+ encoder->flush();
+
+ decodeAndVerifyBoolean(memStream, data, 1779, nullptr);
+ delete [] data;
+ }
+
+ TEST(BooleanRleEncoder, random_bits_aligned) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 500 * 1024;
+ uint64_t block = 1024;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ std::unique_ptr<BufferedOutputStream> outStream(
+ new BufferedOutputStream(*pool, &memStream, capacity, block));
+
+ std::unique_ptr<ByteRleEncoder> encoder =
+ createBooleanRleEncoder(std::move(outStream));
+
+ char* data = new char[8000];
+ generateBoolData(8000, data);
+ encoder->add(data, 8000, nullptr);
+ encoder->flush();
+
+ decodeAndVerifyBoolean(memStream, data, 8000, nullptr);
+ delete [] data;
+ }
+
+ TEST(BooleanRleEncoder, random_bits_aligned_with_null) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 500 * 1024;
+ uint64_t block = 1024;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ std::unique_ptr<BufferedOutputStream> outStream(
+ new BufferedOutputStream(*pool, &memStream, capacity, block));
+
+ std::unique_ptr<ByteRleEncoder> encoder =
+ createBooleanRleEncoder(std::move(outStream));
+
+ char* notNull = new char[8000];
+ char* data = new char[8000];
+ generateBoolData(8000, data, 515, notNull);
+ encoder->add(data, 8000, notNull);
+ encoder->flush();
+
+ decodeAndVerifyBoolean(memStream, data, 8000, notNull);
+ delete [] data;
+ delete [] notNull;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/44c29108/c++/test/TestRLEv1Encoder.cc
----------------------------------------------------------------------
diff --git a/c++/test/TestRLEv1Encoder.cc b/c++/test/TestRLEv1Encoder.cc
new file mode 100644
index 0000000..9762a5c
--- /dev/null
+++ b/c++/test/TestRLEv1Encoder.cc
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstdlib>
+
+#include "MemoryOutputStream.hh"
+#include "RLEv1.hh"
+
+#include "wrap/orc-proto-wrapper.hh"
+#include "wrap/gtest-wrapper.h"
+
+namespace orc {
+
+ const int DEFAULT_MEM_STREAM_SIZE = 1024 * 1024; // 1M
+
+ void generateData(
+ uint64_t numValues,
+ int64_t start,
+ int64_t delta,
+ bool random,
+ int64_t* data,
+ uint64_t numNulls = 0,
+ char* notNull = nullptr) {
+ if (numNulls != 0 && notNull != nullptr) {
+ memset(notNull, 1, numValues);
+ while (numNulls > 0) {
+ uint64_t pos = static_cast<uint64_t>(std::rand()) % numValues;
+ if (notNull[pos]) {
+ notNull[pos] = static_cast<char>(0);
+ --numNulls;
+ }
+ }
+ }
+
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (notNull == nullptr || notNull[i])
+ {
+ if (!random) {
+ data[i] = start + delta * static_cast<int64_t>(i);
+ } else {
+ data[i] = std::rand();
+ }
+ }
+ }
+ }
+
+ void decodeAndVerify(
+ const MemoryOutputStream& memStream,
+ int64_t * data,
+ uint64_t numValues,
+ const char* notNull,
+ bool isSinged) {
+ RleDecoderV1 decoder(
+ std::unique_ptr<SeekableInputStream>(
+ new SeekableArrayInputStream(
+ memStream.getData(),
+ memStream.getLength())),
+ isSinged);
+
+ int64_t* decodedData = new int64_t[numValues];
+ decoder.next(decodedData, numValues, notNull);
+
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ EXPECT_EQ(data[i], decodedData[i]);
+ }
+ }
+
+ delete [] decodedData;
+ }
+
+ TEST(RleEncoderV1, delta_increasing_sequance_unsigned) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 500 * 1024;
+ uint64_t block = 1024;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ RleEncoderV1 encoder(
+ std::unique_ptr<BufferedOutputStream>(
+ new BufferedOutputStream(*pool, &memStream, capacity, block)),
+ false);
+
+ int64_t* data = new int64_t[1024];
+ generateData(1024, 0, 1, false, data);
+ encoder.add(data, 1024, nullptr);
+ encoder.flush();
+
+ decodeAndVerify(memStream, data, 1024, nullptr, false);
+ delete [] data;
+ }
+
+ TEST(RleEncoderV1, delta_increasing_sequance_unsigned_null) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 500 * 1024;
+ uint64_t block = 1024;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ RleEncoderV1 encoder(
+ std::unique_ptr<BufferedOutputStream>(
+ new BufferedOutputStream(*pool, &memStream, capacity, block)),
+ false);
+
+ char* notNull = new char[1024];
+ int64_t* data = new int64_t[1024];
+ generateData(1024, 0, 1, false, data, 100, notNull);
+ encoder.add(data, 1024, notNull);
+ encoder.flush();
+
+ decodeAndVerify(memStream, data, 1024, notNull, false);
+ delete [] data;
+ delete [] notNull;
+ }
+
+ TEST(RleEncoderV1, delta_decreasing_sequance_unsigned) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 500 * 1024;
+ uint64_t block = 1024;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ RleEncoderV1 encoder(
+ std::unique_ptr<BufferedOutputStream>(
+ new BufferedOutputStream(*pool, &memStream, capacity, block)),
+ false);
+
+ int64_t* data = new int64_t[1024];
+ generateData(1024, 5000, -3, false, data);
+ encoder.add(data, 1024, nullptr);
+ encoder.flush();
+
+ decodeAndVerify(memStream, data, 1024, nullptr, false);
+ delete [] data;
+ }
+
+ TEST(RleEncoderV1, delta_decreasing_sequance_signed) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 500 * 1024;
+ uint64_t block = 1024;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ RleEncoderV1 encoder(
+ std::unique_ptr<BufferedOutputStream>(
+ new BufferedOutputStream(*pool, &memStream, capacity, block)),
+ true);
+
+ int64_t* data = new int64_t[1024];
+ generateData(1024, 100, -3, false, data);
+ encoder.add(data, 1024, nullptr);
+ encoder.flush();
+
+ decodeAndVerify(memStream, data, 1024, nullptr, true);
+ delete [] data;
+ }
+
+ TEST(RleEncoderV1, delta_decreasing_sequance_signed_null) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 500 * 1024;
+ uint64_t block = 1024;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ RleEncoderV1 encoder(
+ std::unique_ptr<BufferedOutputStream>(
+ new BufferedOutputStream(*pool, &memStream, capacity, block)),
+ true);
+
+ char* notNull = new char[1024];
+ int64_t* data = new int64_t[1024];
+ generateData(1024, 100, -3, false, data, 500, notNull);
+ encoder.add(data, 1024, notNull);
+ encoder.flush();
+
+ decodeAndVerify(memStream, data, 1024, notNull, true);
+ delete [] data;
+ delete [] notNull;
+ }
+
+ TEST(RleEncoderV1, random_sequance_signed) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 500 * 1024;
+ uint64_t block = 1024;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ RleEncoderV1 encoder(
+ std::unique_ptr<BufferedOutputStream>(
+ new BufferedOutputStream(*pool, &memStream, capacity, block)),
+ true);
+
+ int64_t* data = new int64_t[1024];
+ generateData(1024, 0, 0, true, data);
+ encoder.add(data, 1024, nullptr);
+ encoder.flush();
+
+ decodeAndVerify(memStream, data, 1024, nullptr, true);
+ delete [] data;
+ }
+
+ TEST(RleEncoderV1, all_null) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 500 * 1024;
+ uint64_t block = 1024;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ RleEncoderV1 encoder(
+ std::unique_ptr<BufferedOutputStream>(
+ new BufferedOutputStream(*pool, &memStream, capacity, block)),
+ true);
+
+ char* notNull = new char[1024];
+ int64_t* data = new int64_t[1024];
+ generateData(1024, 100, -3, false, data, 1024, notNull);
+ encoder.add(data, 1024, notNull);
+ encoder.flush();
+
+ decodeAndVerify(memStream, data, 1024, notNull, true);
+ delete [] data;
+ delete [] notNull;
+ }
+}