You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by no...@apache.org on 2016/01/09 00:51:54 UTC
[3/7] parquet-cpp git commit: PARQUET-416: C++11 compilation,
code reorg, libparquet and installation targets
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/impala/rle-encoding.h
----------------------------------------------------------------------
diff --git a/src/impala/rle-encoding.h b/src/impala/rle-encoding.h
deleted file mode 100644
index 759f917..0000000
--- a/src/impala/rle-encoding.h
+++ /dev/null
@@ -1,417 +0,0 @@
-// Copyright 2012 Cloudera Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef IMPALA_RLE_ENCODING_H
-#define IMPALA_RLE_ENCODING_H
-
-#include <math.h>
-
-#include "impala/compiler-util.h"
-#include "impala/bit-stream-utils.inline.h"
-#include "impala/bit-util.h"
-#include "impala/logging.h"
-
-namespace impala {
-
-// Utility classes to do run length encoding (RLE) for fixed bit width values. If runs
-// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
-// (literal encoding).
-// For both types of runs, there is a byte-aligned indicator which encodes the length
-// of the run and the type of the run.
-// This encoding has the benefit that when there aren't any long enough runs, values
-// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
-// the run length are byte aligned. This allows for very efficient decoding
-// implementations.
-// The encoding is:
-// encoded-block := run*
-// run := literal-run | repeated-run
-// literal-run := literal-indicator < literal bytes >
-// repeated-run := repeated-indicator < repeated value. padded to byte boundary >
-// literal-indicator := varint_encode( number_of_groups << 1 | 1)
-// repeated-indicator := varint_encode( number_of_repetitions << 1 )
-//
-// Each run is preceded by a varint. The varint's least significant bit is
-// used to indicate whether the run is a literal run or a repeated run. The rest
-// of the varint is used to determine the length of the run (eg how many times the
-// value repeats).
-//
-// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
-// in groups of 8), so that no matter the bit-width of the value, the sequence will end
-// on a byte boundary without padding.
-// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
-// the actual number of encoded ints. (This means that the total number of encoded values
-// can not be determined from the encoded data, since the number of values in the last
-// group may not be a multiple of 8). For the last group of literal runs, we pad
-// the group to 8 with zeros. This allows for 8 at a time decoding on the read side
-// without the need for additional checks.
-//
-// There is a break-even point when it is more storage efficient to do run length
-// encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes
-// for both the repeated encoding or the literal encoding. This value can always
-// be computed based on the bit-width.
-// TODO: think about how to use this for strings. The bit packing isn't quite the same.
-//
-// Examples with bit-width 1 (eg encoding booleans):
-// ----------------------------------------
-// 100 1s followed by 100 0s:
-// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
-// - (total 4 bytes)
-//
-// alternating 1s and 0s (200 total):
-// 200 ints = 25 groups of 8
-// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
-// (total 26 bytes, 1 byte overhead)
-//
-
-// Decoder class for RLE encoded data.
-class RleDecoder {
- public:
- // Create a decoder object. buffer/buffer_len is the decoded data.
- // bit_width is the width of each value (before encoding).
- RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
- : bit_reader_(buffer, buffer_len),
- bit_width_(bit_width),
- current_value_(0),
- repeat_count_(0),
- literal_count_(0) {
- DCHECK_GE(bit_width_, 0);
- DCHECK_LE(bit_width_, 64);
- }
-
- RleDecoder() {}
-
- // Gets the next value. Returns false if there are no more.
- template<typename T>
- bool Get(T* val);
-
- private:
- BitReader bit_reader_;
- int bit_width_;
- uint64_t current_value_;
- uint32_t repeat_count_;
- uint32_t literal_count_;
-};
-
-// Class to incrementally build the rle data. This class does not allocate any memory.
-// The encoding has two modes: encoding repeated runs and literal runs.
-// If the run is sufficiently short, it is more efficient to encode as a literal run.
-// This class does so by buffering 8 values at a time. If they are not all the same
-// they are added to the literal run. If they are the same, they are added to the
-// repeated run. When we switch modes, the previous run is flushed out.
-class RleEncoder {
- public:
- // buffer/buffer_len: preallocated output buffer.
- // bit_width: max number of bits for value.
- // TODO: consider adding a min_repeated_run_length so the caller can control
- // when values should be encoded as repeated runs. Currently this is derived
- // based on the bit_width, which can determine a storage optimal choice.
- // TODO: allow 0 bit_width (and have dict encoder use it)
- RleEncoder(uint8_t* buffer, int buffer_len, int bit_width)
- : bit_width_(bit_width),
- bit_writer_(buffer, buffer_len) {
- DCHECK_GE(bit_width_, 1);
- DCHECK_LE(bit_width_, 64);
- max_run_byte_size_ = MinBufferSize(bit_width);
- DCHECK_GE(buffer_len, max_run_byte_size_) << "Input buffer not big enough.";
- Clear();
- }
-
- // Returns the minimum buffer size needed to use the encoder for 'bit_width'
- // This is the maximum length of a single run for 'bit_width'.
- // It is not valid to pass a buffer less than this length.
- static int MinBufferSize(int bit_width) {
- // 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bit_width' values.
- int max_literal_run_size = 1 +
- BitUtil::Ceil(MAX_VALUES_PER_LITERAL_RUN * bit_width, 8);
- // Up to MAX_VLQ_BYTE_LEN indicator and a single 'bit_width' value.
- int max_repeated_run_size = BitReader::MAX_VLQ_BYTE_LEN + BitUtil::Ceil(bit_width, 8);
- return std::max(max_literal_run_size, max_repeated_run_size);
- }
-
- // Returns the maximum byte size it could take to encode 'num_values'.
- static int MaxBufferSize(int bit_width, int num_values) {
- int bytes_per_run = BitUtil::Ceil(bit_width * MAX_VALUES_PER_LITERAL_RUN, 8.0);
- int num_runs = BitUtil::Ceil(num_values, MAX_VALUES_PER_LITERAL_RUN);
- int literal_max_size = num_runs + num_runs * bytes_per_run;
- int min_run_size = MinBufferSize(bit_width);
- return std::max(min_run_size, literal_max_size) + min_run_size;
- }
-
- // Encode value. Returns true if the value fits in buffer, false otherwise.
- // This value must be representable with bit_width_ bits.
- bool Put(uint64_t value);
-
- // Flushes any pending values to the underlying buffer.
- // Returns the total number of bytes written
- int Flush();
-
- // Resets all the state in the encoder.
- void Clear();
-
- // Returns pointer to underlying buffer
- uint8_t* buffer() { return bit_writer_.buffer(); }
- int32_t len() { return bit_writer_.bytes_written(); }
-
- private:
- // Flushes any buffered values. If this is part of a repeated run, this is largely
- // a no-op.
- // If it is part of a literal run, this will call FlushLiteralRun, which writes
- // out the buffered literal values.
- // If 'done' is true, the current run would be written even if it would normally
- // have been buffered more. This should only be called at the end, when the
- // encoder has received all values even if it would normally continue to be
- // buffered.
- void FlushBufferedValues(bool done);
-
- // Flushes literal values to the underlying buffer. If update_indicator_byte,
- // then the current literal run is complete and the indicator byte is updated.
- void FlushLiteralRun(bool update_indicator_byte);
-
- // Flushes a repeated run to the underlying buffer.
- void FlushRepeatedRun();
-
- // Checks and sets buffer_full_. This must be called after flushing a run to
- // make sure there are enough bytes remaining to encode the next run.
- void CheckBufferFull();
-
- // The maximum number of values in a single literal run
- // (number of groups encodable by a 1-byte indicator * 8)
- static const int MAX_VALUES_PER_LITERAL_RUN = (1 << 6) * 8;
-
- // Number of bits needed to encode the value.
- const int bit_width_;
-
- // Underlying buffer.
- BitWriter bit_writer_;
-
- // If true, the buffer is full and subsequent Put()'s will fail.
- bool buffer_full_;
-
- // The maximum byte size a single run can take.
- int max_run_byte_size_;
-
- // We need to buffer at most 8 values for literals. This happens when the
- // bit_width is 1 (so 8 values fit in one byte).
- // TODO: generalize this to other bit widths
- int64_t buffered_values_[8];
-
- // Number of values in buffered_values_
- int num_buffered_values_;
-
- // The current (also last) value that was written and the count of how
- // many times in a row that value has been seen. This is maintained even
- // if we are in a literal run. If the repeat_count_ get high enough, we switch
- // to encoding repeated runs.
- int64_t current_value_;
- int repeat_count_;
-
- // Number of literals in the current run. This does not include the literals
- // that might be in buffered_values_. Only after we've got a group big enough
- // can we decide if they should part of the literal_count_ or repeat_count_
- int literal_count_;
-
- // Pointer to a byte in the underlying buffer that stores the indicator byte.
- // This is reserved as soon as we need a literal run but the value is written
- // when the literal run is complete.
- uint8_t* literal_indicator_byte_;
-};
-
-template<typename T>
-inline bool RleDecoder::Get(T* val) {
- if (UNLIKELY(literal_count_ == 0 && repeat_count_ == 0)) {
- // Read the next run's indicator int, it could be a literal or repeated run
- // The int is encoded as a vlq-encoded value.
- uint64_t indicator_value = 0;
- bool result = bit_reader_.GetVlqInt(&indicator_value);
- if (!result) return false;
-
- // lsb indicates if it is a literal run or repeated run
- bool is_literal = indicator_value & 1;
- if (is_literal) {
- literal_count_ = (indicator_value >> 1) * 8;
- } else {
- repeat_count_ = indicator_value >> 1;
- bool result = bit_reader_.GetAligned<T>(
- BitUtil::Ceil(bit_width_, 8), reinterpret_cast<T*>(¤t_value_));
- DCHECK(result);
- }
- }
-
- if (LIKELY(repeat_count_ > 0)) {
- *val = current_value_;
- --repeat_count_;
- } else {
- DCHECK(literal_count_ > 0);
- bool result = bit_reader_.GetValue(bit_width_, val);
- DCHECK(result);
- --literal_count_;
- }
-
- return true;
-}
-
-// This function buffers input values 8 at a time. After seeing all 8 values,
-// it decides whether they should be encoded as a literal or repeated run.
-inline bool RleEncoder::Put(uint64_t value) {
- DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
- if (UNLIKELY(buffer_full_)) return false;
-
- if (LIKELY(current_value_ == value)) {
- ++repeat_count_;
- if (repeat_count_ > 8) {
- // This is just a continuation of the current run, no need to buffer the
- // values.
- // Note that this is the fast path for long repeated runs.
- return true;
- }
- } else {
- if (repeat_count_ >= 8) {
- // We had a run that was long enough but it has ended. Flush the
- // current repeated run.
- DCHECK_EQ(literal_count_, 0);
- FlushRepeatedRun();
- }
- repeat_count_ = 1;
- current_value_ = value;
- }
-
- buffered_values_[num_buffered_values_] = value;
- if (++num_buffered_values_ == 8) {
- DCHECK_EQ(literal_count_ % 8, 0);
- FlushBufferedValues(false);
- }
- return true;
-}
-
-inline void RleEncoder::FlushLiteralRun(bool update_indicator_byte) {
- if (literal_indicator_byte_ == NULL) {
- // The literal indicator byte has not been reserved yet, get one now.
- literal_indicator_byte_ = bit_writer_.GetNextBytePtr();
- DCHECK(literal_indicator_byte_ != NULL);
- }
-
- // Write all the buffered values as bit packed literals
- for (int i = 0; i < num_buffered_values_; ++i) {
- bool success = bit_writer_.PutValue(buffered_values_[i], bit_width_);
- DCHECK(success) << "There is a bug in using CheckBufferFull()";
- }
- num_buffered_values_ = 0;
-
- if (update_indicator_byte) {
- // At this point we need to write the indicator byte for the literal run.
- // We only reserve one byte, to allow for streaming writes of literal values.
- // The logic makes sure we flush literal runs often enough to not overrun
- // the 1 byte.
- DCHECK_EQ(literal_count_ % 8, 0);
- int num_groups = literal_count_ / 8;
- int32_t indicator_value = (num_groups << 1) | 1;
- DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
- *literal_indicator_byte_ = indicator_value;
- literal_indicator_byte_ = NULL;
- literal_count_ = 0;
- CheckBufferFull();
- }
-}
-
-inline void RleEncoder::FlushRepeatedRun() {
- DCHECK_GT(repeat_count_, 0);
- bool result = true;
- // The lsb of 0 indicates this is a repeated run
- int32_t indicator_value = repeat_count_ << 1 | 0;
- result &= bit_writer_.PutVlqInt(indicator_value);
- result &= bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
- DCHECK(result);
- num_buffered_values_ = 0;
- repeat_count_ = 0;
- CheckBufferFull();
-}
-
-// Flush the values that have been buffered. At this point we decide whether
-// we need to switch between the run types or continue the current one.
-inline void RleEncoder::FlushBufferedValues(bool done) {
- if (repeat_count_ >= 8) {
- // Clear the buffered values. They are part of the repeated run now and we
- // don't want to flush them out as literals.
- num_buffered_values_ = 0;
- if (literal_count_ != 0) {
- // There was a current literal run. All the values in it have been flushed
- // but we still need to update the indicator byte.
- DCHECK_EQ(literal_count_ % 8, 0);
- DCHECK_EQ(repeat_count_, 8);
- FlushLiteralRun(true);
- }
- DCHECK_EQ(literal_count_, 0);
- return;
- }
-
- literal_count_ += num_buffered_values_;
- DCHECK_EQ(literal_count_ % 8, 0);
- int num_groups = literal_count_ / 8;
- if (num_groups + 1 >= (1 << 6)) {
- // We need to start a new literal run because the indicator byte we've reserved
- // cannot store more values.
- DCHECK(literal_indicator_byte_ != NULL);
- FlushLiteralRun(true);
- } else {
- FlushLiteralRun(done);
- }
- repeat_count_ = 0;
-}
-
-inline int RleEncoder::Flush() {
- if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
- bool all_repeat = literal_count_ == 0 &&
- (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
- // There is something pending, figure out if it's a repeated or literal run
- if (repeat_count_ > 0 && all_repeat) {
- FlushRepeatedRun();
- } else {
- DCHECK_EQ(literal_count_ % 8, 0);
- // Buffer the last group of literals to 8 by padding with 0s.
- for (; num_buffered_values_ != 0 && num_buffered_values_ < 8;
- ++num_buffered_values_) {
- buffered_values_[num_buffered_values_] = 0;
- }
- literal_count_ += num_buffered_values_;
- FlushLiteralRun(true);
- repeat_count_ = 0;
- }
- }
- bit_writer_.Flush();
- DCHECK_EQ(num_buffered_values_, 0);
- DCHECK_EQ(literal_count_, 0);
- DCHECK_EQ(repeat_count_, 0);
-
- return bit_writer_.bytes_written();
-}
-
-inline void RleEncoder::CheckBufferFull() {
- int bytes_written = bit_writer_.bytes_written();
- if (bytes_written + max_run_byte_size_ > bit_writer_.buffer_len()) {
- buffer_full_ = true;
- }
-}
-
-inline void RleEncoder::Clear() {
- buffer_full_ = false;
- current_value_ = 0;
- repeat_count_ = 0;
- num_buffered_values_ = 0;
- literal_count_ = 0;
- literal_indicator_byte_ = NULL;
- bit_writer_.Clear();
-}
-
-}
-#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet.cc
----------------------------------------------------------------------
diff --git a/src/parquet.cc b/src/parquet.cc
index 6c939ae..f71d32b 100644
--- a/src/parquet.cc
+++ b/src/parquet.cc
@@ -13,9 +13,10 @@
// limitations under the License.
#include "parquet/parquet.h"
-#include "encodings/encodings.h"
-#include "compression/codec.h"
+#include "parquet/encodings/encodings.h"
+#include "parquet/compression/codec.h"
+#include <algorithm>
#include <string>
#include <string.h>
@@ -23,18 +24,21 @@
const int DATA_PAGE_SIZE = 64 * 1024;
-using namespace boost;
-using namespace parquet;
-using namespace std;
-
namespace parquet_cpp {
+using parquet::CompressionCodec;
+using parquet::Encoding;
+using parquet::FieldRepetitionType;
+using parquet::PageType;
+using parquet::SchemaElement;
+using parquet::Type;
+
InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) :
buffer_(buffer), len_(len), offset_(0) {
}
const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) {
- *num_bytes = ::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
+ *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
return buffer_ + offset_;
}
@@ -47,7 +51,7 @@ const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) {
ColumnReader::~ColumnReader() {
}
-ColumnReader::ColumnReader(const ColumnMetaData* metadata,
+ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata,
const SchemaElement* schema, InputStream* stream)
: metadata_(metadata),
schema_(schema),
@@ -96,7 +100,7 @@ ColumnReader::ColumnReader(const ColumnMetaData* metadata,
void ColumnReader::BatchDecode() {
buffered_values_offset_ = 0;
- uint8_t* buf= &values_buffer_[0];
+ uint8_t* buf = &values_buffer_[0];
int batch_size = config_.batch_size;
switch (metadata_->type) {
case parquet::Type::BOOLEAN:
@@ -164,7 +168,7 @@ bool ColumnReader::ReadNewPage() {
}
if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
- boost::unordered_map<Encoding::type, boost::shared_ptr<Decoder> >::iterator it =
+ std::unordered_map<Encoding::type, std::shared_ptr<Decoder> >::iterator it =
decoders_.find(Encoding::RLE_DICTIONARY);
if (it != decoders_.end()) {
throw ParquetException("Column cannot have more than one dictionary.");
@@ -173,7 +177,7 @@ bool ColumnReader::ReadNewPage() {
PlainDecoder dictionary(schema_->type);
dictionary.SetData(current_page_header_.dictionary_page_header.num_values,
buffer, uncompressed_len);
- boost::shared_ptr<Decoder> decoder(
+ std::shared_ptr<Decoder> decoder(
new DictionaryDecoder(schema_->type, &dictionary));
decoders_[Encoding::RLE_DICTIONARY] = decoder;
current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
@@ -187,7 +191,7 @@ bool ColumnReader::ReadNewPage() {
int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);
buffer += sizeof(uint32_t);
definition_level_decoder_.reset(
- new impala::RleDecoder(buffer, num_definition_bytes, 1));
+ new RleDecoder(buffer, num_definition_bytes, 1));
buffer += num_definition_bytes;
uncompressed_len -= sizeof(uint32_t);
uncompressed_len -= num_definition_bytes;
@@ -200,14 +204,14 @@ bool ColumnReader::ReadNewPage() {
Encoding::type encoding = current_page_header_.data_page_header.encoding;
if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY;
- boost::unordered_map<Encoding::type, boost::shared_ptr<Decoder> >::iterator it =
+ std::unordered_map<Encoding::type, std::shared_ptr<Decoder> >::iterator it =
decoders_.find(encoding);
if (it != decoders_.end()) {
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
- boost::shared_ptr<Decoder> decoder;
+ std::shared_ptr<Decoder> decoder;
if (schema_->type == Type::BOOLEAN) {
decoder.reset(new BoolDecoder());
} else {
@@ -239,5 +243,4 @@ bool ColumnReader::ReadNewPage() {
return true;
}
-}
-
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
new file mode 100644
index 0000000..11eaeb6
--- /dev/null
+++ b/src/parquet/CMakeLists.txt
@@ -0,0 +1,18 @@
+# Copyright 2015 Cloudera Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Headers: top level
+install(FILES
+ parquet.h
+ DESTINATION include/parquet)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/compression/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/compression/CMakeLists.txt b/src/parquet/compression/CMakeLists.txt
new file mode 100644
index 0000000..291ef03
--- /dev/null
+++ b/src/parquet/compression/CMakeLists.txt
@@ -0,0 +1,30 @@
+# Copyright 2012 Cloudera Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+add_library(parquet_compression STATIC
+ lz4-codec.cc
+ snappy-codec.cc
+)
+target_link_libraries(parquet_compression
+ lz4static
+ snappystatic)
+
+set_target_properties(parquet_compression
+ PROPERTIES
+ LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}")
+
+# Headers: compression
+install(FILES
+ codec.h
+ DESTINATION include/parquet/compression)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/compression/codec.h
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h
new file mode 100644
index 0000000..8166847
--- /dev/null
+++ b/src/parquet/compression/codec.h
@@ -0,0 +1,71 @@
+// Copyright 2012 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef PARQUET_COMPRESSION_CODEC_H
+#define PARQUET_COMPRESSION_CODEC_H
+
+#include "parquet/parquet.h"
+
+#include <cstdint>
+#include "parquet/thrift/parquet_constants.h"
+#include "parquet/thrift/parquet_types.h"
+
+namespace parquet_cpp {
+
+class Codec {
+ public:
+ virtual ~Codec() {}
+ virtual void Decompress(int input_len, const uint8_t* input,
+ int output_len, uint8_t* output_buffer) = 0;
+
+ virtual int Compress(int input_len, const uint8_t* input,
+ int output_buffer_len, uint8_t* output_buffer) = 0;
+
+ virtual int MaxCompressedLen(int input_len, const uint8_t* input) = 0;
+
+ virtual const char* name() const = 0;
+};
+
+
+// Snappy codec.
+class SnappyCodec : public Codec {
+ public:
+ virtual void Decompress(int input_len, const uint8_t* input,
+ int output_len, uint8_t* output_buffer);
+
+ virtual int Compress(int input_len, const uint8_t* input,
+ int output_buffer_len, uint8_t* output_buffer);
+
+ virtual int MaxCompressedLen(int input_len, const uint8_t* input);
+
+ virtual const char* name() const { return "snappy"; }
+};
+
+// Lz4 codec.
+class Lz4Codec : public Codec {
+ public:
+ virtual void Decompress(int input_len, const uint8_t* input,
+ int output_len, uint8_t* output_buffer);
+
+ virtual int Compress(int input_len, const uint8_t* input,
+ int output_buffer_len, uint8_t* output_buffer);
+
+ virtual int MaxCompressedLen(int input_len, const uint8_t* input);
+
+ virtual const char* name() const { return "lz4"; }
+};
+
+} // namespace parquet_cpp
+
+#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/compression/lz4-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/lz4-codec.cc b/src/parquet/compression/lz4-codec.cc
new file mode 100644
index 0000000..6655387
--- /dev/null
+++ b/src/parquet/compression/lz4-codec.cc
@@ -0,0 +1,40 @@
+// Copyright 2012 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "parquet/compression/codec.h"
+
+#include <lz4.h>
+
+namespace parquet_cpp {
+
+void Lz4Codec::Decompress(int input_len, const uint8_t* input,
+ int output_len, uint8_t* output_buffer) {
+ int n = LZ4_uncompress(reinterpret_cast<const char*>(input),
+ reinterpret_cast<char*>(output_buffer), output_len);
+ if (n != input_len) {
+ throw parquet_cpp::ParquetException("Corrupt lz4 compressed data.");
+ }
+}
+
+int Lz4Codec::MaxCompressedLen(int input_len, const uint8_t* input) {
+ return LZ4_compressBound(input_len);
+}
+
+int Lz4Codec::Compress(int input_len, const uint8_t* input,
+ int output_buffer_len, uint8_t* output_buffer) {
+ return LZ4_compress(reinterpret_cast<const char*>(input),
+ reinterpret_cast<char*>(output_buffer), input_len);
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/compression/snappy-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/snappy-codec.cc b/src/parquet/compression/snappy-codec.cc
new file mode 100644
index 0000000..0633d47
--- /dev/null
+++ b/src/parquet/compression/snappy-codec.cc
@@ -0,0 +1,42 @@
+// Copyright 2012 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "parquet/compression/codec.h"
+
+#include <snappy.h>
+
+namespace parquet_cpp {
+
+void SnappyCodec::Decompress(int input_len, const uint8_t* input,
+ int output_len, uint8_t* output_buffer) {
+ if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
+ static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer))) {
+ throw parquet_cpp::ParquetException("Corrupt snappy compressed data.");
+ }
+}
+
+int SnappyCodec::MaxCompressedLen(int input_len, const uint8_t* input) {
+ return snappy::MaxCompressedLength(input_len);
+}
+
+int SnappyCodec::Compress(int input_len, const uint8_t* input,
+ int output_buffer_len, uint8_t* output_buffer) {
+ size_t output_len;
+ snappy::RawCompress(reinterpret_cast<const char*>(input),
+ static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer),
+ &output_len);
+ return output_len;
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/CMakeLists.txt b/src/parquet/encodings/CMakeLists.txt
new file mode 100644
index 0000000..72baf48
--- /dev/null
+++ b/src/parquet/encodings/CMakeLists.txt
@@ -0,0 +1,24 @@
+# Copyright 2015 Cloudera Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Headers: encodings
+install(FILES
+ encodings.h
+ bool-encoding.h
+ delta-bit-pack-encoding.h
+ delta-byte-array-encoding.h
+ delta-length-byte-array-encoding.h
+ dictionary-encoding.h
+ plain-encoding.h
+ DESTINATION include/parquet/encodings)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/bool-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/bool-encoding.h b/src/parquet/encodings/bool-encoding.h
new file mode 100644
index 0000000..8eb55bc
--- /dev/null
+++ b/src/parquet/encodings/bool-encoding.h
@@ -0,0 +1,48 @@
+// Copyright 2012 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef PARQUET_BOOL_ENCODING_H
+#define PARQUET_BOOL_ENCODING_H
+
+#include "parquet/encodings/encodings.h"
+
+#include <algorithm>
+
+namespace parquet_cpp {
+
+class BoolDecoder : public Decoder {
+ public:
+ BoolDecoder() : Decoder(parquet::Type::BOOLEAN, parquet::Encoding::PLAIN) { }
+
+ virtual void SetData(int num_values, const uint8_t* data, int len) {
+ num_values_ = num_values;
+ decoder_ = RleDecoder(data, len, 1);
+ }
+
+ virtual int GetBool(bool* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ for (int i = 0; i < max_values; ++i) {
+ if (!decoder_.Get(&buffer[i])) ParquetException::EofException();
+ }
+ num_values_ -= max_values;
+ return max_values;
+ }
+
+ private:
+ RleDecoder decoder_;
+};
+
+} // namespace parquet_cpp
+
+#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/delta-bit-pack-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h
new file mode 100644
index 0000000..77a3b26
--- /dev/null
+++ b/src/parquet/encodings/delta-bit-pack-encoding.h
@@ -0,0 +1,116 @@
+// Copyright 2012 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef PARQUET_DELTA_BIT_PACK_ENCODING_H
+#define PARQUET_DELTA_BIT_PACK_ENCODING_H
+
+#include "parquet/encodings/encodings.h"
+
+#include <algorithm>
+#include <vector>
+
+namespace parquet_cpp {
+
+class DeltaBitPackDecoder : public Decoder {
+ public:
+ explicit DeltaBitPackDecoder(const parquet::Type::type& type)
+ : Decoder(type, parquet::Encoding::DELTA_BINARY_PACKED) {
+ if (type != parquet::Type::INT32 && type != parquet::Type::INT64) {
+ throw ParquetException("Delta bit pack encoding should only be for integer data.");
+ }
+ }
+
+ virtual void SetData(int num_values, const uint8_t* data, int len) {
+ num_values_ = num_values;
+ decoder_ = BitReader(data, len);
+ values_current_block_ = 0;
+ values_current_mini_block_ = 0;
+ }
+
+ virtual int GetInt32(int32_t* buffer, int max_values) {
+ return GetInternal(buffer, max_values);
+ }
+
+ virtual int GetInt64(int64_t* buffer, int max_values) {
+ return GetInternal(buffer, max_values);
+ }
+
+ private:
+ void InitBlock() {
+ uint64_t block_size;
+ if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException();
+ if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException();
+ if (!decoder_.GetVlqInt(&values_current_block_)) {
+ ParquetException::EofException();
+ }
+ if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
+ delta_bit_widths_.resize(num_mini_blocks_);
+
+ if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
+ for (int i = 0; i < num_mini_blocks_; ++i) {
+ if (!decoder_.GetAligned<uint8_t>(1, &delta_bit_widths_[i])) {
+ ParquetException::EofException();
+ }
+ }
+ values_per_mini_block_ = block_size / num_mini_blocks_;
+ mini_block_idx_ = 0;
+ delta_bit_width_ = delta_bit_widths_[0];
+ values_current_mini_block_ = values_per_mini_block_;
+ }
+
+ template <typename T>
+ int GetInternal(T* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ for (int i = 0; i < max_values; ++i) {
+ if (UNLIKELY(values_current_mini_block_ == 0)) {
+ ++mini_block_idx_;
+ if (mini_block_idx_ < delta_bit_widths_.size()) {
+ delta_bit_width_ = delta_bit_widths_[mini_block_idx_];
+ values_current_mini_block_ = values_per_mini_block_;
+ } else {
+ InitBlock();
+ buffer[i] = last_value_;
+ continue;
+ }
+ }
+
+ // TODO: the key to this algorithm is to decode the entire miniblock at once.
+ int64_t delta;
+ if (!decoder_.GetValue(delta_bit_width_, &delta)) ParquetException::EofException();
+ delta += min_delta_;
+ last_value_ += delta;
+ buffer[i] = last_value_;
+ --values_current_mini_block_;
+ }
+ num_values_ -= max_values;
+ return max_values;
+ }
+
+ BitReader decoder_;
+ uint64_t values_current_block_;
+ uint64_t num_mini_blocks_;
+ uint64_t values_per_mini_block_;
+ uint64_t values_current_mini_block_;
+
+ int64_t min_delta_;
+ int mini_block_idx_;
+ std::vector<uint8_t> delta_bit_widths_;
+ int delta_bit_width_;
+
+ int64_t last_value_;
+};
+
+} // namespace parquet_cpp
+
+#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/delta-byte-array-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-byte-array-encoding.h b/src/parquet/encodings/delta-byte-array-encoding.h
new file mode 100644
index 0000000..3396586
--- /dev/null
+++ b/src/parquet/encodings/delta-byte-array-encoding.h
@@ -0,0 +1,74 @@
+// Copyright 2012 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef PARQUET_DELTA_BYTE_ARRAY_ENCODING_H
+#define PARQUET_DELTA_BYTE_ARRAY_ENCODING_H
+
+#include "parquet/encodings/encodings.h"
+
+#include <algorithm>
+
+namespace parquet_cpp {
+
+class DeltaByteArrayDecoder : public Decoder {
+ public:
+ DeltaByteArrayDecoder()
+ : Decoder(parquet::Type::BYTE_ARRAY, parquet::Encoding::DELTA_BYTE_ARRAY),
+ prefix_len_decoder_(parquet::Type::INT32),
+ suffix_decoder_() {
+ }
+
+ virtual void SetData(int num_values, const uint8_t* data, int len) {
+ num_values_ = num_values;
+ if (len == 0) return;
+ int prefix_len_length = *reinterpret_cast<const int*>(data);
+ data += 4;
+ len -= 4;
+ prefix_len_decoder_.SetData(num_values, data, prefix_len_length);
+ data += prefix_len_length;
+ len -= prefix_len_length;
+ suffix_decoder_.SetData(num_values, data, len);
+ }
+
+ // TODO: this doesn't work and requires memory management. We need to allocate
+ // new strings to store the results.
+ virtual int GetByteArray(ByteArray* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ for (int i = 0; i < max_values; ++i) {
+ int prefix_len = 0;
+ prefix_len_decoder_.GetInt32(&prefix_len, 1);
+ ByteArray suffix;
+ suffix_decoder_.GetByteArray(&suffix, 1);
+ buffer[i].len = prefix_len + suffix.len;
+
+ uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len));
+ memcpy(result, last_value_.ptr, prefix_len);
+ memcpy(result + prefix_len, suffix.ptr, suffix.len);
+
+ buffer[i].ptr = result;
+ last_value_ = buffer[i];
+ }
+ num_values_ -= max_values;
+ return max_values;
+ }
+
+ private:
+ DeltaBitPackDecoder prefix_len_decoder_;
+ DeltaLengthByteArrayDecoder suffix_decoder_;
+ ByteArray last_value_;
+};
+
+} // namespace parquet_cpp
+
+#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/delta-length-byte-array-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-length-byte-array-encoding.h b/src/parquet/encodings/delta-length-byte-array-encoding.h
new file mode 100644
index 0000000..06bf39d
--- /dev/null
+++ b/src/parquet/encodings/delta-length-byte-array-encoding.h
@@ -0,0 +1,63 @@
+// Copyright 2012 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef PARQUET_DELTA_LENGTH_BYTE_ARRAY_ENCODING_H
+#define PARQUET_DELTA_LENGTH_BYTE_ARRAY_ENCODING_H
+
+#include "parquet/encodings/encodings.h"
+
+#include <algorithm>
+
+namespace parquet_cpp {
+
+class DeltaLengthByteArrayDecoder : public Decoder {
+ public:
+ DeltaLengthByteArrayDecoder()
+ : Decoder(parquet::Type::BYTE_ARRAY, parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY),
+ len_decoder_(parquet::Type::INT32) {
+ }
+
+ virtual void SetData(int num_values, const uint8_t* data, int len) {
+ num_values_ = num_values;
+ if (len == 0) return;
+ int total_lengths_len = *reinterpret_cast<const int*>(data);
+ data += 4;
+ len_decoder_.SetData(num_values, data, total_lengths_len);
+ data_ = data + total_lengths_len;
+ len_ = len - 4 - total_lengths_len;
+ }
+
+ virtual int GetByteArray(ByteArray* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ int lengths[max_values];
+ len_decoder_.GetInt32(lengths, max_values);
+ for (int i = 0; i < max_values; ++i) {
+ buffer[i].len = lengths[i];
+ buffer[i].ptr = data_;
+ data_ += lengths[i];
+ len_ -= lengths[i];
+ }
+ num_values_ -= max_values;
+ return max_values;
+ }
+
+ private:
+ DeltaBitPackDecoder len_decoder_;
+ const uint8_t* data_;
+ int len_;
+};
+
+} // namespace parquet_cpp
+
+#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
new file mode 100644
index 0000000..2501b2a
--- /dev/null
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -0,0 +1,148 @@
+// Copyright 2012 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef PARQUET_DICTIONARY_ENCODING_H
+#define PARQUET_DICTIONARY_ENCODING_H
+
+#include "parquet/encodings/encodings.h"
+
+#include <algorithm>
+#include <vector>
+
+namespace parquet_cpp {
+
+class DictionaryDecoder : public Decoder {
+ public:
+ // Initializes the dictionary with values from 'dictionary'. The data in dictionary
+ // is not guaranteed to persist in memory after this call so the dictionary decoder
+ // needs to copy the data out if necessary.
+ DictionaryDecoder(const parquet::Type::type& type, Decoder* dictionary)
+ : Decoder(type, parquet::Encoding::RLE_DICTIONARY) {
+ int num_dictionary_values = dictionary->values_left();
+ switch (type) {
+ case parquet::Type::BOOLEAN:
+ throw ParquetException("Boolean cols should not be dictionary encoded.");
+
+ case parquet::Type::INT32:
+ int32_dictionary_.resize(num_dictionary_values);
+ dictionary->GetInt32(&int32_dictionary_[0], num_dictionary_values);
+ break;
+ case parquet::Type::INT64:
+ int64_dictionary_.resize(num_dictionary_values);
+ dictionary->GetInt64(&int64_dictionary_[0], num_dictionary_values);
+ break;
+ case parquet::Type::FLOAT:
+ float_dictionary_.resize(num_dictionary_values);
+ dictionary->GetFloat(&float_dictionary_[0], num_dictionary_values);
+ break;
+ case parquet::Type::DOUBLE:
+ double_dictionary_.resize(num_dictionary_values);
+ dictionary->GetDouble(&double_dictionary_[0], num_dictionary_values);
+ break;
+ case parquet::Type::BYTE_ARRAY: {
+ byte_array_dictionary_.resize(num_dictionary_values);
+ dictionary->GetByteArray(&byte_array_dictionary_[0], num_dictionary_values);
+ int total_size = 0;
+ for (int i = 0; i < num_dictionary_values; ++i) {
+ total_size += byte_array_dictionary_[i].len;
+ }
+ byte_array_data_.resize(total_size);
+ int offset = 0;
+ for (int i = 0; i < num_dictionary_values; ++i) {
+ memcpy(&byte_array_data_[offset],
+ byte_array_dictionary_[i].ptr, byte_array_dictionary_[i].len);
+ byte_array_dictionary_[i].ptr = &byte_array_data_[offset];
+ offset += byte_array_dictionary_[i].len;
+ }
+ break;
+ }
+ default:
+ ParquetException::NYI("Unsupported dictionary type");
+ }
+ }
+
+ virtual void SetData(int num_values, const uint8_t* data, int len) {
+ num_values_ = num_values;
+ if (len == 0) return;
+ uint8_t bit_width = *data;
+ ++data;
+ --len;
+ idx_decoder_ = RleDecoder(data, len, bit_width);
+ }
+
+ virtual int GetInt32(int32_t* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ for (int i = 0; i < max_values; ++i) {
+ buffer[i] = int32_dictionary_[index()];
+ }
+ return max_values;
+ }
+
+ virtual int GetInt64(int64_t* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ for (int i = 0; i < max_values; ++i) {
+ buffer[i] = int64_dictionary_[index()];
+ }
+ return max_values;
+ }
+
+ virtual int GetFloat(float* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ for (int i = 0; i < max_values; ++i) {
+ buffer[i] = float_dictionary_[index()];
+ }
+ return max_values;
+ }
+
+ virtual int GetDouble(double* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ for (int i = 0; i < max_values; ++i) {
+ buffer[i] = double_dictionary_[index()];
+ }
+ return max_values;
+ }
+
+ virtual int GetByteArray(ByteArray* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ for (int i = 0; i < max_values; ++i) {
+ buffer[i] = byte_array_dictionary_[index()];
+ }
+ return max_values;
+ }
+
+ private:
+ int index() {
+ int idx = 0;
+ if (!idx_decoder_.Get(&idx)) ParquetException::EofException();
+ --num_values_;
+ return idx;
+ }
+
+ // Only one is set.
+ std::vector<int32_t> int32_dictionary_;
+ std::vector<int64_t> int64_dictionary_;
+ std::vector<float> float_dictionary_;
+ std::vector<double> double_dictionary_;
+ std::vector<ByteArray> byte_array_dictionary_;
+
+ // Data that contains the byte array data (byte_array_dictionary_ just has the
+ // pointers).
+ std::vector<uint8_t> byte_array_data_;
+
+ RleDecoder idx_decoder_;
+};
+
+} // namespace parquet_cpp
+
+#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/encodings.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/encodings.h
new file mode 100644
index 0000000..9211bf8
--- /dev/null
+++ b/src/parquet/encodings/encodings.h
@@ -0,0 +1,82 @@
+// Copyright 2012 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef PARQUET_ENCODINGS_ENCODINGS_H
+#define PARQUET_ENCODINGS_ENCODINGS_H
+
+#include <cstdint>
+
+#include "parquet/thrift/parquet_constants.h"
+#include "parquet/thrift/parquet_types.h"
+#include "parquet/util/rle-encoding.h"
+#include "parquet/util/bit-stream-utils.inline.h"
+
+namespace parquet_cpp {
+
+class Decoder {
+ public:
+ virtual ~Decoder() {}
+
+ // Sets the data for a new page. This will be called multiple times on the same
+ // decoder and should reset all internal state.
+ virtual void SetData(int num_values, const uint8_t* data, int len) = 0;
+
+ // Subclasses should override the ones they support. In each of these functions,
+ // the decoder would decode put to 'max_values', storing the result in 'buffer'.
+ // The function returns the number of values decoded, which should be max_values
+ // except for end of the current data page.
+ virtual int GetBool(bool* buffer, int max_values) {
+ throw ParquetException("Decoder does not implement this type.");
+ }
+ virtual int GetInt32(int32_t* buffer, int max_values) {
+ throw ParquetException("Decoder does not implement this type.");
+ }
+ virtual int GetInt64(int64_t* buffer, int max_values) {
+ throw ParquetException("Decoder does not implement this type.");
+ }
+ virtual int GetFloat(float* buffer, int max_values) {
+ throw ParquetException("Decoder does not implement this type.");
+ }
+ virtual int GetDouble(double* buffer, int max_values) {
+ throw ParquetException("Decoder does not implement this type.");
+ }
+ virtual int GetByteArray(ByteArray* buffer, int max_values) {
+ throw ParquetException("Decoder does not implement this type.");
+ }
+
+ // Returns the number of values left (for the last call to SetData()). This is
+ // the number of values left in this page.
+ int values_left() const { return num_values_; }
+
+ const parquet::Encoding::type encoding() const { return encoding_; }
+
+ protected:
+ Decoder(const parquet::Type::type& type, const parquet::Encoding::type& encoding)
+ : type_(type), encoding_(encoding), num_values_(0) {}
+
+ const parquet::Type::type type_;
+ const parquet::Encoding::type encoding_;
+ int num_values_;
+};
+
+} // namespace parquet_cpp
+
+#include "parquet/encodings/bool-encoding.h"
+#include "parquet/encodings/plain-encoding.h"
+#include "parquet/encodings/dictionary-encoding.h"
+#include "parquet/encodings/delta-bit-pack-encoding.h"
+#include "parquet/encodings/delta-length-byte-array-encoding.h"
+#include "parquet/encodings/delta-byte-array-encoding.h"
+
+#endif // PARQUET_ENCODINGS_ENCODINGS_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
new file mode 100644
index 0000000..b094cdb
--- /dev/null
+++ b/src/parquet/encodings/plain-encoding.h
@@ -0,0 +1,83 @@
+// Copyright 2012 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef PARQUET_PLAIN_ENCODING_H
+#define PARQUET_PLAIN_ENCODING_H
+
+#include "parquet/encodings/encodings.h"
+
+#include <algorithm>
+
+namespace parquet_cpp {
+
+class PlainDecoder : public Decoder {
+ public:
+ explicit PlainDecoder(const parquet::Type::type& type)
+ : Decoder(type, parquet::Encoding::PLAIN), data_(NULL), len_(0) {
+ }
+
+ virtual void SetData(int num_values, const uint8_t* data, int len) {
+ num_values_ = num_values;
+ data_ = data;
+ len_ = len;
+ }
+
+ int GetValues(void* buffer, int max_values, int byte_size) {
+ max_values = std::min(max_values, num_values_);
+ int size = max_values * byte_size;
+ if (len_ < size) ParquetException::EofException();
+ memcpy(buffer, data_, size);
+ data_ += size;
+ len_ -= size;
+ num_values_ -= max_values;
+ return max_values;
+ }
+
+ virtual int GetInt32(int32_t* buffer, int max_values) {
+ return GetValues(buffer, max_values, sizeof(int32_t));
+ }
+
+ virtual int GetInt64(int64_t* buffer, int max_values) {
+ return GetValues(buffer, max_values, sizeof(int64_t));
+ }
+
+ virtual int GetFloat(float* buffer, int max_values) {
+ return GetValues(buffer, max_values, sizeof(float));
+ }
+
+ virtual int GetDouble(double* buffer, int max_values) {
+ return GetValues(buffer, max_values, sizeof(double));
+ }
+
+ virtual int GetByteArray(ByteArray* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ for (int i = 0; i < max_values; ++i) {
+ buffer[i].len = *reinterpret_cast<const uint32_t*>(data_);
+ if (len_ < sizeof(uint32_t) + buffer[i].len) ParquetException::EofException();
+ buffer[i].ptr = data_ + sizeof(uint32_t);
+ data_ += sizeof(uint32_t) + buffer[i].len;
+ len_ -= sizeof(uint32_t) + buffer[i].len;
+ }
+ num_values_ -= max_values;
+ return max_values;
+ }
+
+ private:
+ const uint8_t* data_;
+ int len_;
+};
+
+} // namespace parquet_cpp
+
+#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/parquet.h
----------------------------------------------------------------------
diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h
index c1a73b7..320f003 100644
--- a/src/parquet/parquet.h
+++ b/src/parquet/parquet.h
@@ -17,14 +17,18 @@
#include <exception>
#include <sstream>
-#include <boost/cstdint.hpp>
-#include <boost/scoped_ptr.hpp>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+// Needed for thrift
#include <boost/shared_ptr.hpp>
-#include <boost/unordered_map.hpp>
-#include "gen-cpp/parquet_constants.h"
-#include "gen-cpp/parquet_types.h"
-#include "impala/rle-encoding.h"
+#include "parquet/thrift/parquet_constants.h"
+#include "parquet/thrift/parquet_types.h"
+#include "parquet/util/rle-encoding.h"
// TCompactProtocol requires some #defines to work right.
#define SIGNED_RIGHT_SHIFT_IS 1
@@ -36,6 +40,17 @@
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TBufferTransports.h>
+namespace std {
+
+template <>
+struct hash<parquet::Encoding::type> {
+ std::size_t operator()(const parquet::Encoding::type& k) const {
+ return hash<int>()(static_cast<int>(k));
+ }
+};
+
+} // namespace std
+
namespace parquet_cpp {
class Codec;
@@ -146,18 +161,18 @@ class ColumnReader {
InputStream* stream_;
// Compression codec to use.
- boost::scoped_ptr<Codec> decompressor_;
+ std::unique_ptr<Codec> decompressor_;
std::vector<uint8_t> decompression_buffer_;
// Map of compression type to decompressor object.
- boost::unordered_map<parquet::Encoding::type, boost::shared_ptr<Decoder> > decoders_;
+ std::unordered_map<parquet::Encoding::type, std::shared_ptr<Decoder> > decoders_;
parquet::PageHeader current_page_header_;
// Not set if field is required.
- boost::scoped_ptr<impala::RleDecoder> definition_level_decoder_;
+ std::unique_ptr<RleDecoder> definition_level_decoder_;
// Not set for flat schemas.
- boost::scoped_ptr<impala::RleDecoder> repetition_level_decoder_;
+ std::unique_ptr<RleDecoder> repetition_level_decoder_;
Decoder* current_decoder_;
int num_buffered_values_;
@@ -241,7 +256,6 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali
*len = *len - bytes_left;
}
-}
+} // namespace parquet_cpp
#endif
-
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/thrift/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/CMakeLists.txt b/src/parquet/thrift/CMakeLists.txt
new file mode 100644
index 0000000..e2a00c9
--- /dev/null
+++ b/src/parquet/thrift/CMakeLists.txt
@@ -0,0 +1,29 @@
+# Copyright 2012 Cloudera Inc.
+
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+add_library(parquet_thrift STATIC
+ parquet_constants.cpp
+ parquet_types.cpp
+)
+set_target_properties(parquet_thrift
+ PROPERTIES
+ LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}")
+
+
+# Headers: thrift
+install(FILES
+ parquet_types.h
+ parquet_constants.h
+ DESTINATION include/parquet/thrift)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/thrift/parquet_constants.cpp
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/parquet_constants.cpp b/src/parquet/thrift/parquet_constants.cpp
new file mode 100644
index 0000000..caa5af6
--- /dev/null
+++ b/src/parquet/thrift/parquet_constants.cpp
@@ -0,0 +1,17 @@
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+#include "parquet_constants.h"
+
+namespace parquet {
+
+const parquetConstants g_parquet_constants;
+
+parquetConstants::parquetConstants() {
+}
+
+} // namespace
+
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/337cf584/src/parquet/thrift/parquet_constants.h
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/parquet_constants.h b/src/parquet/thrift/parquet_constants.h
new file mode 100644
index 0000000..71d6f58
--- /dev/null
+++ b/src/parquet/thrift/parquet_constants.h
@@ -0,0 +1,24 @@
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+#ifndef parquet_CONSTANTS_H
+#define parquet_CONSTANTS_H
+
+#include "parquet_types.h"
+
+namespace parquet {
+
+class parquetConstants {
+ public:
+ parquetConstants();
+
+};
+
+extern const parquetConstants g_parquet_constants;
+
+} // namespace
+
+#endif