You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2016/02/26 18:52:50 UTC
[1/2] parquet-cpp git commit: PARQUET-494: Implement
DictionaryEncoder and test dictionary decoding
Repository: parquet-cpp
Updated Branches:
refs/heads/master 1df5a26d6 -> c6e069297
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/mem-pool.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool.cc b/src/parquet/util/mem-pool.cc
new file mode 100644
index 0000000..6e56c28
--- /dev/null
+++ b/src/parquet/util/mem-pool.cc
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Initially imported from Apache Impala on 2016-02-23, and has been modified
+// since for parquet-cpp
+
+#include "parquet/util/mem-pool.h"
+
+#include <stdio.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <sstream>
+#include <string>
+
+#include "parquet/util/bit-util.h"
+
+namespace parquet_cpp {
+
+const int MemPool::INITIAL_CHUNK_SIZE;
+const int MemPool::MAX_CHUNK_SIZE;
+
+MemPool::MemPool()
+ : current_chunk_idx_(-1),
+ next_chunk_size_(INITIAL_CHUNK_SIZE),
+ total_allocated_bytes_(0),
+ peak_allocated_bytes_(0),
+ total_reserved_bytes_(0) {}
+
+MemPool::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf)
+ : data(buf),
+ size(size),
+ allocated_bytes(0) {
+}
+
+MemPool::~MemPool() {
+ int64_t total_bytes_released = 0;
+ for (size_t i = 0; i < chunks_.size(); ++i) {
+ total_bytes_released += chunks_[i].size;
+ free(chunks_[i].data);
+ }
+
+ DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool";
+}
+
+void MemPool::Clear() {
+ current_chunk_idx_ = -1;
+ for (auto chunk = chunks_.begin(); chunk != chunks_.end(); ++chunk) {
+ chunk->allocated_bytes = 0;
+ }
+ total_allocated_bytes_ = 0;
+ DCHECK(CheckIntegrity(false));
+}
+
+void MemPool::FreeAll() {
+ int64_t total_bytes_released = 0;
+ for (size_t i = 0; i < chunks_.size(); ++i) {
+ total_bytes_released += chunks_[i].size;
+ free(chunks_[i].data);
+ }
+ chunks_.clear();
+ next_chunk_size_ = INITIAL_CHUNK_SIZE;
+ current_chunk_idx_ = -1;
+ total_allocated_bytes_ = 0;
+ total_reserved_bytes_ = 0;
+}
+
+bool MemPool::FindChunk(int64_t min_size) {
+ // Try to allocate from a free chunk. The first free chunk, if any, will be immediately
+ // after the current chunk.
+ int first_free_idx = current_chunk_idx_ + 1;
+ // (cast size() to signed int in order to avoid everything else being cast to
+ // unsigned long, in particular -1)
+ while (++current_chunk_idx_ < static_cast<int>(chunks_.size())) {
+ // we found a free chunk
+ DCHECK_EQ(chunks_[current_chunk_idx_].allocated_bytes, 0);
+
+ if (chunks_[current_chunk_idx_].size >= min_size) {
+ // This chunk is big enough. Move it before the other free chunks.
+ if (current_chunk_idx_ != first_free_idx) {
+ std::swap(chunks_[current_chunk_idx_], chunks_[first_free_idx]);
+ current_chunk_idx_ = first_free_idx;
+ }
+ break;
+ }
+ }
+
+ if (current_chunk_idx_ == static_cast<int>(chunks_.size())) {
+ // need to allocate new chunk.
+ int64_t chunk_size;
+ DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE);
+ DCHECK_LE(next_chunk_size_, MAX_CHUNK_SIZE);
+
+ chunk_size = std::max<int64_t>(min_size, next_chunk_size_);
+
+ // Allocate a new chunk. Return early if malloc fails.
+ uint8_t* buf = reinterpret_cast<uint8_t*>(malloc(chunk_size));
+ if (UNLIKELY(buf == NULL)) {
+ DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
+ current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
+ return false;
+ }
+
+ // If there are no free chunks put it at the end, otherwise before the first free.
+ if (first_free_idx == static_cast<int>(chunks_.size())) {
+ chunks_.push_back(ChunkInfo(chunk_size, buf));
+ } else {
+ current_chunk_idx_ = first_free_idx;
+ auto insert_chunk = chunks_.begin() + current_chunk_idx_;
+ chunks_.insert(insert_chunk, ChunkInfo(chunk_size, buf));
+ }
+ total_reserved_bytes_ += chunk_size;
+ // Don't increment the chunk size until the allocation succeeds: if an attempted
+ // large allocation fails we don't want to increase the chunk size further.
+ next_chunk_size_ = static_cast<int>(std::min<int64_t>(
+ chunk_size * 2, MAX_CHUNK_SIZE));
+ }
+
+ DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
+ DCHECK(CheckIntegrity(true));
+ return true;
+}
+
+void MemPool::AcquireData(MemPool* src, bool keep_current) {
+ DCHECK(src->CheckIntegrity(false));
+ int num_acquired_chunks;
+ if (keep_current) {
+ num_acquired_chunks = src->current_chunk_idx_;
+ } else if (src->GetFreeOffset() == 0) {
+ // nothing in the last chunk
+ num_acquired_chunks = src->current_chunk_idx_;
+ } else {
+ num_acquired_chunks = src->current_chunk_idx_ + 1;
+ }
+
+ if (num_acquired_chunks <= 0) {
+ if (!keep_current) src->FreeAll();
+ return;
+ }
+
+ auto end_chunk = src->chunks_.begin() + num_acquired_chunks;
+ int64_t total_transfered_bytes = 0;
+ for (auto i = src->chunks_.begin(); i != end_chunk; ++i) {
+ total_transfered_bytes += i->size;
+ }
+ src->total_reserved_bytes_ -= total_transfered_bytes;
+ total_reserved_bytes_ += total_transfered_bytes;
+
+ // insert new chunks after current_chunk_idx_
+ auto insert_chunk = chunks_.begin() + current_chunk_idx_ + 1;
+ chunks_.insert(insert_chunk, src->chunks_.begin(), end_chunk);
+ src->chunks_.erase(src->chunks_.begin(), end_chunk);
+ current_chunk_idx_ += num_acquired_chunks;
+
+ if (keep_current) {
+ src->current_chunk_idx_ = 0;
+ DCHECK(src->chunks_.size() == 1 || src->chunks_[1].allocated_bytes == 0);
+ total_allocated_bytes_ += src->total_allocated_bytes_ - src->GetFreeOffset();
+ src->total_allocated_bytes_ = src->GetFreeOffset();
+ } else {
+ src->current_chunk_idx_ = -1;
+ total_allocated_bytes_ += src->total_allocated_bytes_;
+ src->total_allocated_bytes_ = 0;
+ }
+ peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
+
+ if (!keep_current) src->FreeAll();
+ DCHECK(CheckIntegrity(false));
+}
+
+std::string MemPool::DebugString() {
+ std::stringstream out;
+ char str[16];
+ out << "MemPool(#chunks=" << chunks_.size() << " [";
+ for (int i = 0; i < chunks_.size(); ++i) {
+ sprintf(str, "0x%lx=", reinterpret_cast<size_t>(chunks_[i].data)); // NOLINT
+ out << (i > 0 ? " " : "")
+ << str
+ << chunks_[i].size
+ << "/" << chunks_[i].allocated_bytes;
+ }
+ out << "] current_chunk=" << current_chunk_idx_
+ << " total_sizes=" << GetTotalChunkSizes()
+ << " total_alloc=" << total_allocated_bytes_
+ << ")";
+ return out.str();
+}
+
+int64_t MemPool::GetTotalChunkSizes() const {
+ int64_t result = 0;
+ for (int i = 0; i < chunks_.size(); ++i) {
+ result += chunks_[i].size;
+ }
+ return result;
+}
+
+bool MemPool::CheckIntegrity(bool current_chunk_empty) {
+ // check that current_chunk_idx_ points to the last chunk with allocated data
+ DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
+ int64_t total_allocated = 0;
+ for (int i = 0; i < chunks_.size(); ++i) {
+ DCHECK_GT(chunks_[i].size, 0);
+ if (i < current_chunk_idx_) {
+ DCHECK_GT(chunks_[i].allocated_bytes, 0);
+ } else if (i == current_chunk_idx_) {
+ if (current_chunk_empty) {
+ DCHECK_EQ(chunks_[i].allocated_bytes, 0);
+ } else {
+ DCHECK_GT(chunks_[i].allocated_bytes, 0);
+ }
+ } else {
+ DCHECK_EQ(chunks_[i].allocated_bytes, 0);
+ }
+ total_allocated += chunks_[i].allocated_bytes;
+ }
+ DCHECK_EQ(total_allocated, total_allocated_bytes_);
+ return true;
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/mem-pool.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool.h b/src/parquet/util/mem-pool.h
new file mode 100644
index 0000000..88a8715
--- /dev/null
+++ b/src/parquet/util/mem-pool.h
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Initially imported from Apache Impala on 2016-02-23, and has been modified
+// since for parquet-cpp
+
+#ifndef PARQUET_UTIL_MEM_POOL_H
+#define PARQUET_UTIL_MEM_POOL_H
+
+#include <stdio.h>
+#include <algorithm>
+#include <cstdint>
+#include <vector>
+#include <string>
+
+#include "parquet/util/logging.h"
+#include "parquet/util/bit-util.h"
+
+namespace parquet_cpp {
+
+/// A MemPool maintains a list of memory chunks from which it allocates memory in
+/// response to Allocate() calls;
+/// Chunks stay around for the lifetime of the mempool or until they are passed on to
+/// another mempool.
+//
+/// An Allocate() call will attempt to allocate memory from the chunk that was most
+/// recently added; if that chunk doesn't have enough memory to
+/// satisfy the allocation request, the free chunks are searched for one that is
+/// big enough otherwise a new chunk is added to the list.
+/// The current_chunk_idx_ always points to the last chunk with allocated memory.
+/// In order to keep allocation overhead low, chunk sizes double with each new one
+/// added, until they hit a maximum size.
+//
+/// Example:
+/// MemPool* p = new MemPool();
+/// for (int i = 0; i < 1024; ++i) {
+/// returns 8-byte aligned memory (effectively 24 bytes):
+/// .. = p->Allocate(17);
+/// }
+/// at this point, 17K have been handed out in response to Allocate() calls and
+/// 28K of chunks have been allocated (chunk sizes: 4K, 8K, 16K)
+/// We track total and peak allocated bytes. At this point they would be the same:
+/// 28k bytes. A call to Clear will return the allocated memory so
+/// total_allocate_bytes_
+/// becomes 0 while peak_allocate_bytes_ remains at 28k.
+/// p->Clear();
+/// the entire 1st chunk is returned:
+/// .. = p->Allocate(4 * 1024);
+/// 4K of the 2nd chunk are returned:
+/// .. = p->Allocate(4 * 1024);
+/// a new 20K chunk is created
+/// .. = p->Allocate(20 * 1024);
+//
+/// MemPool* p2 = new MemPool();
+/// the new mempool receives all chunks containing data from p
+/// p2->AcquireData(p, false);
+/// At this point p.total_allocated_bytes_ would be 0 while p.peak_allocated_bytes_
+/// remains unchanged.
+/// The one remaining (empty) chunk is released:
+/// delete p;
+
+class MemPool {
+ public:
+ MemPool();
+
+ /// Frees all chunks of memory and subtracts the total allocated bytes
+ /// from the registered limits.
+ ~MemPool();
+
+ /// Allocates 8-byte aligned section of memory of 'size' bytes at the end
+ /// of the the current chunk. Creates a new chunk if there aren't any chunks
+ /// with enough capacity.
+ uint8_t* Allocate(int size) {
+ return Allocate<false>(size);
+ }
+
+ /// Returns 'byte_size' to the current chunk back to the mem pool. This can
+ /// only be used to return either all or part of the previous allocation returned
+ /// by Allocate().
+ void ReturnPartialAllocation(int byte_size) {
+ DCHECK_GE(byte_size, 0);
+ DCHECK(current_chunk_idx_ != -1);
+ ChunkInfo& info = chunks_[current_chunk_idx_];
+ DCHECK_GE(info.allocated_bytes, byte_size);
+ info.allocated_bytes -= byte_size;
+ total_allocated_bytes_ -= byte_size;
+ }
+
+ /// Makes all allocated chunks available for re-use, but doesn't delete any chunks.
+ void Clear();
+
+ /// Deletes all allocated chunks. FreeAll() or AcquireData() must be called for
+ /// each mem pool
+ void FreeAll();
+
+ /// Absorb all chunks that hold data from src. If keep_current is true, let src hold on
+ /// to its last allocated chunk that contains data.
+ /// All offsets handed out by calls to GetCurrentOffset() for 'src' become invalid.
+ void AcquireData(MemPool* src, bool keep_current);
+
+ std::string DebugString();
+
+ int64_t total_allocated_bytes() const { return total_allocated_bytes_; }
+ int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; }
+ int64_t total_reserved_bytes() const { return total_reserved_bytes_; }
+
+ /// Return sum of chunk_sizes_.
+ int64_t GetTotalChunkSizes() const;
+
+ private:
+ friend class MemPoolTest;
+ static const int INITIAL_CHUNK_SIZE = 4 * 1024;
+
+ /// The maximum size of chunk that should be allocated. Allocations larger than this
+ /// size will get their own individual chunk.
+ static const int MAX_CHUNK_SIZE = 1024 * 1024;
+
+ struct ChunkInfo {
+ uint8_t* data; // Owned by the ChunkInfo.
+ int64_t size; // in bytes
+
+ /// bytes allocated via Allocate() in this chunk
+ int64_t allocated_bytes;
+
+ explicit ChunkInfo(int64_t size, uint8_t* buf);
+
+ ChunkInfo()
+ : data(NULL),
+ size(0),
+ allocated_bytes(0) {}
+ };
+
+ /// chunk from which we served the last Allocate() call;
+ /// always points to the last chunk that contains allocated data;
+ /// chunks 0..current_chunk_idx_ are guaranteed to contain data
+ /// (chunks_[i].allocated_bytes > 0 for i: 0..current_chunk_idx_);
+ /// -1 if no chunks present
+ int current_chunk_idx_;
+
+ /// The size of the next chunk to allocate.
+ int64_t next_chunk_size_;
+
+ /// sum of allocated_bytes_
+ int64_t total_allocated_bytes_;
+
+ /// Maximum number of bytes allocated from this pool at one time.
+ int64_t peak_allocated_bytes_;
+
+ /// sum of all bytes allocated in chunks_
+ int64_t total_reserved_bytes_;
+
+ std::vector<ChunkInfo> chunks_;
+
+ /// Find or allocated a chunk with at least min_size spare capacity and update
+ /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_
+ /// if a new chunk needs to be created.
+ bool FindChunk(int64_t min_size);
+
+ /// Check integrity of the supporting data structures; always returns true but DCHECKs
+ /// all invariants.
+ /// If 'current_chunk_empty' is false, checks that the current chunk contains data.
+ bool CheckIntegrity(bool current_chunk_empty);
+
+ /// Return offset to unoccpied space in current chunk.
+ int GetFreeOffset() const {
+ if (current_chunk_idx_ == -1) return 0;
+ return chunks_[current_chunk_idx_].allocated_bytes;
+ }
+
+ template <bool CHECK_LIMIT_FIRST>
+ uint8_t* Allocate(int size) {
+ if (size == 0) return NULL;
+
+ int64_t num_bytes = BitUtil::RoundUp(size, 8);
+ if (current_chunk_idx_ == -1
+ || num_bytes + chunks_[current_chunk_idx_].allocated_bytes
+ > chunks_[current_chunk_idx_].size) {
+ // If we couldn't allocate a new chunk, return NULL.
+ if (UNLIKELY(!FindChunk(num_bytes))) return NULL;
+ }
+ ChunkInfo& info = chunks_[current_chunk_idx_];
+ uint8_t* result = info.data + info.allocated_bytes;
+ DCHECK_LE(info.allocated_bytes + num_bytes, info.size);
+ info.allocated_bytes += num_bytes;
+ total_allocated_bytes_ += num_bytes;
+ DCHECK_LE(current_chunk_idx_, chunks_.size() - 1);
+ peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
+ return result;
+ }
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_MEM_POOL_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/output.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h
index 2a43a36..b466e0e 100644
--- a/src/parquet/util/output.h
+++ b/src/parquet/util/output.h
@@ -20,7 +20,6 @@
#include <cstdint>
#include <memory>
-#include <vector>
#include "parquet/util/macros.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/rle-encoding.h b/src/parquet/util/rle-encoding.h
index 22b2c2f..77749f5 100644
--- a/src/parquet/util/rle-encoding.h
+++ b/src/parquet/util/rle-encoding.h
@@ -20,8 +20,8 @@
#ifndef PARQUET_UTIL_RLE_ENCODING_H
#define PARQUET_UTIL_RLE_ENCODING_H
-#include <algorithm>
#include <math.h>
+#include <algorithm>
#include "parquet/util/compiler-util.h"
#include "parquet/util/bit-stream-utils.inline.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/rle-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/rle-test.cc b/src/parquet/util/rle-test.cc
index df020f5..5f18a6f 100644
--- a/src/parquet/util/rle-test.cc
+++ b/src/parquet/util/rle-test.cc
@@ -17,17 +17,18 @@
// From Apache Impala as of 2016-01-29
+#include <gtest/gtest.h>
+#include <math.h>
#include <stdlib.h>
#include <stdio.h>
+
+#include <boost/utility.hpp>
+
#include <cstdint>
#include <iostream>
#include <random>
#include <vector>
-#include <boost/utility.hpp>
-#include <gtest/gtest.h>
-#include <math.h>
-
#include "parquet/util/rle-encoding.h"
#include "parquet/util/bit-stream-utils.inline.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/sse-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/sse-util.h b/src/parquet/util/sse-util.h
index 588c30a..29bf2f9 100644
--- a/src/parquet/util/sse-util.h
+++ b/src/parquet/util/sse-util.h
@@ -25,6 +25,7 @@
namespace parquet_cpp {
+
/// This class contains constants useful for text processing with SSE4.2 intrinsics.
namespace SSEUtil {
/// Number of characters that fit in 64/128 bit register. SSE provides instructions
@@ -93,11 +94,17 @@ namespace SSEUtil {
template<int MODE>
static inline __m128i SSE4_cmpestrm(__m128i str1, int len1, __m128i str2, int len2) {
+#ifdef __clang__
/// Use asm reg rather than Yz output constraint to workaround LLVM bug 13199 -
/// clang doesn't support Y-prefixed asm constraints.
register volatile __m128i result asm("xmm0");
__asm__ volatile ("pcmpestrm %5, %2, %1"
: "=x"(result) : "x"(str1), "xm"(str2), "a"(len1), "d"(len2), "i"(MODE) : "cc");
+#else
+ __m128i result;
+ __asm__ volatile ("pcmpestrm %5, %2, %1"
+ : "=Yz"(result) : "x"(str1), "xm"(str2), "a"(len1), "d"(len2), "i"(MODE) : "cc");
+#endif
return result;
}
@@ -114,11 +121,22 @@ static inline uint32_t SSE4_crc32_u8(uint32_t crc, uint8_t v) {
return crc;
}
+static inline uint32_t SSE4_crc32_u16(uint32_t crc, uint16_t v) {
+ __asm__("crc32w %1, %0" : "+r"(crc) : "rm"(v));
+ return crc;
+}
+
static inline uint32_t SSE4_crc32_u32(uint32_t crc, uint32_t v) {
__asm__("crc32l %1, %0" : "+r"(crc) : "rm"(v));
return crc;
}
+static inline uint32_t SSE4_crc32_u64(uint32_t crc, uint64_t v) {
+ uint64_t result = crc;
+ __asm__("crc32q %1, %0" : "+r"(result) : "rm"(v));
+ return result;
+}
+
static inline int64_t POPCNT_popcnt_u64(uint64_t a) {
int64_t result;
__asm__("popcntq %1, %0" : "=r"(result) : "mr"(a) : "cc");
@@ -148,7 +166,9 @@ static inline int SSE4_cmpestri(
}
#define SSE4_crc32_u8 _mm_crc32_u8
+#define SSE4_crc32_u16 _mm_crc32_u16
#define SSE4_crc32_u32 _mm_crc32_u32
+#define SSE4_crc32_u64 _mm_crc32_u64
#define POPCNT_popcnt_u64 _mm_popcnt_u64
#else // IR_COMPILE without SSE 4.2.
@@ -174,11 +194,21 @@ static inline uint32_t SSE4_crc32_u8(uint32_t crc, uint8_t v) {
return 0;
}
+static inline uint32_t SSE4_crc32_u16(uint32_t crc, uint16_t v) {
+ DCHECK(false) << "CPU doesn't support SSE 4.2";
+ return 0;
+}
+
static inline uint32_t SSE4_crc32_u32(uint32_t crc, uint32_t v) {
DCHECK(false) << "CPU doesn't support SSE 4.2";
return 0;
}
+static inline uint32_t SSE4_crc32_u64(uint32_t crc, uint64_t v) {
+ DCHECK(false) << "CPU doesn't support SSE 4.2";
+ return 0;
+}
+
static inline int64_t POPCNT_popcnt_u64(uint64_t a) {
DCHECK(false) << "CPU doesn't support SSE 4.2";
return 0;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/stopwatch.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/stopwatch.h b/src/parquet/util/stopwatch.h
index 076cfc8..14da2c4 100644
--- a/src/parquet/util/stopwatch.h
+++ b/src/parquet/util/stopwatch.h
@@ -18,11 +18,12 @@
#ifndef PARQUET_UTIL_STOPWATCH_H
#define PARQUET_UTIL_STOPWATCH_H
-#include <iostream>
#include <stdio.h>
-#include <ctime>
#include <sys/time.h>
+#include <iostream>
+#include <ctime>
+
namespace parquet_cpp {
class StopWatch {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/test-common.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h
index 49c4131..9975ed9 100644
--- a/src/parquet/util/test-common.h
+++ b/src/parquet/util/test-common.h
@@ -31,6 +31,10 @@ namespace parquet_cpp {
namespace test {
+typedef ::testing::Types<BooleanType, Int32Type, Int64Type, Int96Type,
+ FloatType, DoubleType, ByteArrayType,
+ FLBAType> ParquetTypes;
+
template <typename T>
static inline void assert_vector_equal(const vector<T>& left,
const vector<T>& right) {
@@ -167,9 +171,9 @@ void random_fixed_byte_array(int n, uint32_t seed, uint8_t *buf, int len,
}
void random_byte_array(int n, uint32_t seed, uint8_t *buf,
- ByteArray* out, int max_size) {
+ ByteArray* out, int min_size, int max_size) {
std::mt19937 gen(seed);
- std::uniform_int_distribution<int> d1(0, max_size);
+ std::uniform_int_distribution<int> d1(min_size, max_size);
std::uniform_int_distribution<int> d2(0, 255);
for (int i = 0; i < n; ++i) {
out[i].len = d1(gen);
@@ -181,6 +185,11 @@ void random_byte_array(int n, uint32_t seed, uint8_t *buf,
}
}
+void random_byte_array(int n, uint32_t seed, uint8_t *buf,
+ ByteArray* out, int max_size) {
+ random_byte_array(n, seed, buf, out, 0, max_size);
+}
+
} // namespace test
} // namespace parquet_cpp
[2/2] parquet-cpp git commit: PARQUET-494: Implement
DictionaryEncoder and test dictionary decoding
Posted by ju...@apache.org.
PARQUET-494: Implement DictionaryEncoder and test dictionary decoding
I incorporated quite a bit of code from Impala for this patch, but did a bunch of work to get everything working. In particular, I wasn't happy with the hash table implementation in `dict-encoder.h` and so have written a simple new one that we can benchmark and tune as necessary.
The simplest way to pull in the DictEncoder (PARQUET-493) was to also bring in the `MemPool` implementation, suitably trimmed down. We can continue to refactor this as needed for parquet-cpp.
I also did some light refactoring using `TYPED_TEST` in `plain-encoding-test` (now `encoding-test`).
Author: Wes McKinney <we...@apache.org>
Closes #64 from wesm/PARQUET-494 and squashes the following commits:
c634abe [Wes McKinney] Refactor to create TestEncoderBase
a3a563a [Wes McKinney] Consolidate dictionary encoding code
2cc4ffe [Wes McKinney] Retrieve type_length() only once in PlainDecoder ctor
20ccd9e [Wes McKinney] Remove DictionaryEncoder shim layer for now
dcfc0aa [Wes McKinney] Remove redundant Int96 comparison
d98a2c0 [Wes McKinney] Dictionary encoding for booleans throws exception
05414f0 [Wes McKinney] Test dictionary encoding more types
9a5b1a4 [Wes McKinney] Enable include_order linting per PARQUET-539
f3f0efc [Wes McKinney] IWYU cleaning
d4191c6 [Wes McKinney] Add header installs, fix clang warning
1347b13 [Wes McKinney] Rename plain-encoding-test to encoding-test
09bf0fa [Wes McKinney] Fix bugs and add dictionary repeats
2e6af48 [Wes McKinney] Fix some bugs. FixedLenByteArray remains to get working.
69b5b69 [Wes McKinney] Refactor test fixtures to be less coupled to state. process on getting dict encoding working
6b23716 [Wes McKinney] Create reusable DataType structs for test fixtures and other compile-time type resolution matters
67883fd [Wes McKinney] Bunch of combined work for dict encoding support:
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/c6e06929
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/c6e06929
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/c6e06929
Branch: refs/heads/master
Commit: c6e069297a3b8d0f9ad45da04fe114d40c593115
Parents: 1df5a26
Author: Wes McKinney <we...@apache.org>
Authored: Fri Feb 26 09:52:46 2016 -0800
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Fri Feb 26 09:52:46 2016 -0800
----------------------------------------------------------------------
CMakeLists.txt | 2 +-
src/parquet/column/column-reader-test.cc | 1 +
src/parquet/column/levels-test.cc | 3 +-
src/parquet/column/reader.cc | 5 +-
src/parquet/column/scanner-test.cc | 54 ++--
src/parquet/compression/codec-test.cc | 5 +-
src/parquet/compression/codec.h | 4 +-
src/parquet/compression/lz4-codec.cc | 3 +-
src/parquet/compression/snappy-codec.cc | 3 +-
src/parquet/encodings/CMakeLists.txt | 2 +-
src/parquet/encodings/dictionary-encoding.h | 311 +++++++++++++++++++++-
src/parquet/encodings/encoder.h | 5 -
src/parquet/encodings/encoding-test.cc | 309 +++++++++++++++++++++
src/parquet/encodings/plain-encoding-test.cc | 232 ----------------
src/parquet/encodings/plain-encoding.h | 96 ++++---
src/parquet/file/reader-internal.h | 2 -
src/parquet/reader-test.cc | 3 +-
src/parquet/schema/schema-descriptor-test.cc | 3 +-
src/parquet/types.h | 36 +++
src/parquet/util/CMakeLists.txt | 16 +-
src/parquet/util/bit-stream-utils.h | 2 +-
src/parquet/util/bit-util-test.cc | 5 +-
src/parquet/util/buffer-builder.h | 61 +++++
src/parquet/util/cpu-info.cc | 10 +-
src/parquet/util/dict-encoding.h | 36 +++
src/parquet/util/hash-util.h | 247 +++++++++++++++++
src/parquet/util/mem-pool-test.cc | 247 +++++++++++++++++
src/parquet/util/mem-pool.cc | 234 ++++++++++++++++
src/parquet/util/mem-pool.h | 208 +++++++++++++++
src/parquet/util/output.h | 1 -
src/parquet/util/rle-encoding.h | 2 +-
src/parquet/util/rle-test.cc | 9 +-
src/parquet/util/sse-util.h | 30 +++
src/parquet/util/stopwatch.h | 5 +-
src/parquet/util/test-common.h | 13 +-
35 files changed, 1840 insertions(+), 365 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 218e74a..5ff9e6c 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -267,7 +267,7 @@ if (UNIX)
add_custom_target(lint ${BUILD_SUPPORT_DIR}/cpplint.py
--verbose=2
--linelength=90
- --filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/include_order,-runtime/references,-readability/check
+ --filter=-whitespace/comments,-readability/todo,-build/header_guard,-runtime/references,-readability/check
`find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h | sed -e '/parquet\\/thrift/g'`)
endif (UNIX)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index 079201a..e64ef28 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -20,6 +20,7 @@
#include <algorithm>
#include <cstdint>
#include <cstdlib>
+#include <limits>
#include <memory>
#include <string>
#include <vector>
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/column/levels-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc
index 0e3c20f..57aa562 100644
--- a/src/parquet/column/levels-test.cc
+++ b/src/parquet/column/levels-test.cc
@@ -15,13 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+#include <gtest/gtest.h>
#include <cstdint>
#include <memory>
#include <vector>
#include <string>
-#include <gtest/gtest.h>
-
#include "parquet/column/levels.h"
#include "parquet/types.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
index 4011347..4cff810 100644
--- a/src/parquet/column/reader.cc
+++ b/src/parquet/column/reader.cc
@@ -52,8 +52,9 @@ void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
//
// TODO(wesm): investigate whether this all-or-nothing decoding of the
// dictionary makes sense and whether performance can be improved
- std::shared_ptr<DecoderType> decoder(
- new DictionaryDecoder<TYPE>(descr_, &dictionary));
+
+ auto decoder = std::make_shared<DictionaryDecoder<TYPE> >(descr_);
+ decoder->SetDict(&dictionary);
decoders_[encoding] = decoder;
current_decoder_ = decoders_[encoding].get();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/column/scanner-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc
index be6b42e..785db08 100644
--- a/src/parquet/column/scanner-test.cc
+++ b/src/parquet/column/scanner-test.cc
@@ -40,16 +40,6 @@ namespace parquet_cpp {
using schema::NodePtr;
-bool operator==(const Int96& a, const Int96& b) {
- return a.value[0] == b.value[0] &&
- a.value[1] == b.value[1] &&
- a.value[2] == b.value[2];
-}
-
-bool operator==(const ByteArray& a, const ByteArray& b) {
- return a.len == b.len && 0 == memcmp(a.ptr, b.ptr, a.len);
-}
-
static int FLBA_LENGTH = 12;
bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
@@ -57,16 +47,10 @@ bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
namespace test {
-template <int N> class TypeValue {
- public:
- static const int value = N;
-};
-template <int N> const int TypeValue<N>::value;
-
-template <typename TYPE>
+template <typename Type>
class TestFlatScanner : public ::testing::Test {
public:
- typedef typename type_traits<TYPE::value>::value_type T;
+ typedef typename Type::c_type T;
void InitValues() {
random_numbers(num_values_, 0, std::numeric_limits<T>::min(),
@@ -106,7 +90,7 @@ class TestFlatScanner : public ::testing::Test {
// Create values
values_.resize(num_values_);
InitValues();
- Paginate<TYPE::value>(d, values_, def_levels_, max_def_level,
+ Paginate<Type::type_num>(d, values_, def_levels_, max_def_level,
rep_levels_, max_rep_level, levels_per_page, values_per_page, pages_);
}
@@ -116,8 +100,8 @@ class TestFlatScanner : public ::testing::Test {
}
void CheckResults(int batch_size, const ColumnDescriptor *d) {
- TypedScanner<TYPE::value>* scanner =
- reinterpret_cast<TypedScanner<TYPE::value>* >(scanner_.get());
+ TypedScanner<Type::type_num>* scanner =
+ reinterpret_cast<TypedScanner<Type::type_num>* >(scanner_.get());
T val;
bool is_null;
int16_t def_level;
@@ -158,14 +142,11 @@ class TestFlatScanner : public ::testing::Test {
void InitDescriptors(std::shared_ptr<ColumnDescriptor>& d1,
std::shared_ptr<ColumnDescriptor>& d2, std::shared_ptr<ColumnDescriptor>& d3) {
NodePtr type;
- type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED,
- static_cast<Type::type>(TYPE::value));
+ type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::type_num);
d1.reset(new ColumnDescriptor(type, 0, 0));
- type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL,
- static_cast<Type::type>(TYPE::value));
+ type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL, Type::type_num);
d2.reset(new ColumnDescriptor(type, 4, 0));
- type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED,
- static_cast<Type::type>(TYPE::value));
+ type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED, Type::type_num);
d3.reset(new ColumnDescriptor(type, 4, 2));
}
@@ -194,18 +175,18 @@ class TestFlatScanner : public ::testing::Test {
};
template<>
-void TestFlatScanner<TypeValue<Type::BOOLEAN> >::InitValues() {
+void TestFlatScanner<BooleanType>::InitValues() {
values_ = flip_coins(num_values_, 0);
}
template<>
-void TestFlatScanner<TypeValue<Type::INT96> >::InitValues() {
+void TestFlatScanner<Int96Type>::InitValues() {
random_Int96_numbers(num_values_, 0, std::numeric_limits<int32_t>::min(),
std::numeric_limits<int32_t>::max(), values_.data());
}
template<>
-void TestFlatScanner<TypeValue<Type::BYTE_ARRAY> >::InitValues() {
+void TestFlatScanner<ByteArrayType>::InitValues() {
int max_byte_array_len = 12;
int num_bytes = max_byte_array_len + sizeof(uint32_t);
size_t nbytes = num_values_ * num_bytes;
@@ -215,7 +196,7 @@ void TestFlatScanner<TypeValue<Type::BYTE_ARRAY> >::InitValues() {
}
template<>
-void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitValues() {
+void TestFlatScanner<FLBAType>::InitValues() {
size_t nbytes = num_values_ * FLBA_LENGTH;
data_buffer_.resize(nbytes);
random_fixed_byte_array(num_values_, 0, data_buffer_.data(), FLBA_LENGTH,
@@ -223,7 +204,7 @@ void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitValues() {
}
template<>
-void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitDescriptors(
+void TestFlatScanner<FLBAType>::InitDescriptors(
std::shared_ptr<ColumnDescriptor>& d1, std::shared_ptr<ColumnDescriptor>& d2,
std::shared_ptr<ColumnDescriptor>& d3) {
NodePtr type = schema::PrimitiveNode::MakeFLBA("c1", Repetition::REQUIRED,
@@ -237,18 +218,13 @@ void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitDescriptors(
d3.reset(new ColumnDescriptor(type, 4, 2));
}
-typedef TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY>> TestFlatFLBAScanner;
+typedef TestFlatScanner<FLBAType> TestFlatFLBAScanner;
static int num_levels_per_page = 100;
static int num_pages = 20;
static int batch_size = 32;
-typedef ::testing::Types<TypeValue<Type::BOOLEAN>, TypeValue<Type::INT32>,
- TypeValue<Type::INT64>, TypeValue<Type::INT96>, TypeValue<Type::FLOAT>,
- TypeValue<Type::DOUBLE>, TypeValue<Type::BYTE_ARRAY>,
- TypeValue<Type::FIXED_LEN_BYTE_ARRAY> > Primitives;
-
-TYPED_TEST_CASE(TestFlatScanner, Primitives);
+TYPED_TEST_CASE(TestFlatScanner, ParquetTypes);
TYPED_TEST(TestFlatScanner, TestScanner) {
this->ExecuteAll(num_pages, num_levels_per_page, batch_size);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/compression/codec-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec-test.cc b/src/parquet/compression/codec-test.cc
index 610fb37..285559a 100644
--- a/src/parquet/compression/codec-test.cc
+++ b/src/parquet/compression/codec-test.cc
@@ -15,14 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+#include <gtest/gtest.h>
#include <cstdint>
#include <string>
#include <vector>
-#include <gtest/gtest.h>
-#include "parquet/util/test-common.h"
-
#include "parquet/compression/codec.h"
+#include "parquet/util/test-common.h"
using std::string;
using std::vector;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/compression/codec.h
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h
index bc73f02..df15d61 100644
--- a/src/parquet/compression/codec.h
+++ b/src/parquet/compression/codec.h
@@ -18,11 +18,11 @@
#ifndef PARQUET_COMPRESSION_CODEC_H
#define PARQUET_COMPRESSION_CODEC_H
+#include <zlib.h>
+
#include <cstdint>
#include <memory>
-#include <zlib.h>
-
#include "parquet/exception.h"
#include "parquet/types.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/compression/lz4-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/lz4-codec.cc b/src/parquet/compression/lz4-codec.cc
index a131031..81413bb 100644
--- a/src/parquet/compression/lz4-codec.cc
+++ b/src/parquet/compression/lz4-codec.cc
@@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-#include "parquet/compression/codec.h"
-
#include <lz4.h>
#include <cstdint>
+#include "parquet/compression/codec.h"
#include "parquet/exception.h"
namespace parquet_cpp {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/compression/snappy-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/snappy-codec.cc b/src/parquet/compression/snappy-codec.cc
index 91590db..991dd04 100644
--- a/src/parquet/compression/snappy-codec.cc
+++ b/src/parquet/compression/snappy-codec.cc
@@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-#include "parquet/compression/codec.h"
-
#include <snappy.h>
#include <cstdint>
#include <cstdlib>
+#include "parquet/compression/codec.h"
#include "parquet/exception.h"
namespace parquet_cpp {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/CMakeLists.txt b/src/parquet/encodings/CMakeLists.txt
index c9349af..eb4cc3c 100644
--- a/src/parquet/encodings/CMakeLists.txt
+++ b/src/parquet/encodings/CMakeLists.txt
@@ -26,4 +26,4 @@ install(FILES
plain-encoding.h
DESTINATION include/parquet/encodings)
-ADD_PARQUET_TEST(plain-encoding-test)
+ADD_PARQUET_TEST(encoding-test)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index b52aefb..eed0659 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -20,10 +20,16 @@
#include <algorithm>
#include <cstdint>
+#include <iostream>
+#include <limits>
#include <vector>
#include "parquet/encodings/decoder.h"
#include "parquet/encodings/encoder.h"
+#include "parquet/encodings/plain-encoding.h"
+#include "parquet/util/dict-encoding.h"
+#include "parquet/util/hash-util.h"
+#include "parquet/util/mem-pool.h"
#include "parquet/util/rle-encoding.h"
namespace parquet_cpp {
@@ -36,14 +42,12 @@ class DictionaryDecoder : public Decoder<TYPE> {
// Initializes the dictionary with values from 'dictionary'. The data in
// dictionary is not guaranteed to persist in memory after this call so the
// dictionary decoder needs to copy the data out if necessary.
- DictionaryDecoder(const ColumnDescriptor* descr,
- Decoder<TYPE>* dictionary)
+ explicit DictionaryDecoder(const ColumnDescriptor* descr)
: Decoder<TYPE>(descr, Encoding::RLE_DICTIONARY) {
- Init(dictionary);
}
// Perform type-specific initiatialization
- void Init(Decoder<TYPE>* dictionary);
+ void SetDict(Decoder<TYPE>* dictionary);
virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
@@ -83,20 +87,20 @@ class DictionaryDecoder : public Decoder<TYPE> {
};
template <int TYPE>
-inline void DictionaryDecoder<TYPE>::Init(Decoder<TYPE>* dictionary) {
+inline void DictionaryDecoder<TYPE>::SetDict(Decoder<TYPE>* dictionary) {
int num_dictionary_values = dictionary->values_left();
dictionary_.resize(num_dictionary_values);
dictionary->Decode(&dictionary_[0], num_dictionary_values);
}
template <>
-inline void DictionaryDecoder<Type::BOOLEAN>::Init(
+inline void DictionaryDecoder<Type::BOOLEAN>::SetDict(
Decoder<Type::BOOLEAN>* dictionary) {
ParquetException::NYI("Dictionary encoding is not implemented for boolean values");
}
template <>
-inline void DictionaryDecoder<Type::BYTE_ARRAY>::Init(
+inline void DictionaryDecoder<Type::BYTE_ARRAY>::SetDict(
Decoder<Type::BYTE_ARRAY>* dictionary) {
int num_dictionary_values = dictionary->values_left();
dictionary_.resize(num_dictionary_values);
@@ -116,7 +120,7 @@ inline void DictionaryDecoder<Type::BYTE_ARRAY>::Init(
}
template <>
-inline void DictionaryDecoder<Type::FIXED_LEN_BYTE_ARRAY>::Init(
+inline void DictionaryDecoder<Type::FIXED_LEN_BYTE_ARRAY>::SetDict(
Decoder<Type::FIXED_LEN_BYTE_ARRAY>* dictionary) {
int num_dictionary_values = dictionary->values_left();
dictionary_.resize(num_dictionary_values);
@@ -134,6 +138,297 @@ inline void DictionaryDecoder<Type::FIXED_LEN_BYTE_ARRAY>::Init(
}
}
+// ----------------------------------------------------------------------
+// Dictionary encoder
+
+// Initially imported from Apache Impala on 2016-02-22, and has been modified
+// since for parquet-cpp
+
+// Initially 1024 elements
+static constexpr int INITIAL_HASH_TABLE_SIZE = 1 << 10;
+
+typedef int32_t hash_slot_t;
+static constexpr hash_slot_t HASH_SLOT_EMPTY = std::numeric_limits<int32_t>::max();
+
+// The maximum load factor for the hash table before resizing.
+static constexpr double MAX_HASH_LOAD = 0.7;
+
+/// See the dictionary encoding section of https://github.com/Parquet/parquet-format.
+/// The encoding supports streaming encoding. Values are encoded as they are added while
+/// the dictionary is being constructed. At any time, the buffered values can be
+/// written out with the current dictionary size. More values can then be added to
+/// the encoder, including new dictionary entries.
+class DictEncoderBase {
+ public:
+ virtual ~DictEncoderBase() {
+ DCHECK(buffered_indices_.empty());
+ }
+
+ /// Writes out the encoded dictionary to buffer. buffer must be preallocated to
+ /// dict_encoded_size() bytes.
+ virtual void WriteDict(uint8_t* buffer) = 0;
+
+ /// The number of entries in the dictionary.
+ virtual int num_entries() const = 0;
+
+ /// Clears all the indices (but leaves the dictionary).
+ void ClearIndices() { buffered_indices_.clear(); }
+
+ /// Returns a conservative estimate of the number of bytes needed to encode the buffered
+ /// indices. Used to size the buffer passed to WriteIndices().
+ int EstimatedDataEncodedSize() {
+ return 1 + RleEncoder::MaxBufferSize(bit_width(), buffered_indices_.size());
+ }
+
+ /// The minimum bit width required to encode the currently buffered indices.
+ int bit_width() const {
+ if (UNLIKELY(num_entries() == 0)) return 0;
+ if (UNLIKELY(num_entries() == 1)) return 1;
+ return BitUtil::Log2(num_entries());
+ }
+
+ /// Writes out any buffered indices to buffer preceded by the bit width of this data.
+ /// Returns the number of bytes written.
+ /// If the supplied buffer is not big enough, returns -1.
+ /// buffer must be preallocated with buffer_len bytes. Use EstimatedDataEncodedSize()
+ /// to size buffer.
+ int WriteIndices(uint8_t* buffer, int buffer_len);
+
+ int hash_table_size() { return hash_table_size_; }
+ int dict_encoded_size() { return dict_encoded_size_; }
+
+ protected:
+ explicit DictEncoderBase(MemPool* pool) :
+ hash_table_size_(INITIAL_HASH_TABLE_SIZE),
+ mod_bitmask_(hash_table_size_ - 1),
+ hash_slots_(hash_table_size_, HASH_SLOT_EMPTY),
+ pool_(pool),
+ dict_encoded_size_(0) {}
+
+ /// Size of the table. Must be a power of 2.
+ int hash_table_size_;
+
+ // Store hash_table_size_ - 1, so that j & mod_bitmask_ is equivalent to j %
+ // hash_table_size_, but uses far fewer CPU cycles
+ int mod_bitmask_;
+
+ // We use a fixed-size hash table with linear probing
+ //
+ // These values correspond to the uniques_ array
+ std::vector<hash_slot_t> hash_slots_;
+
+ // For ByteArray / FixedLenByteArray data. Not owned
+ MemPool* pool_;
+
+ /// Indices that have not yet be written out by WriteIndices().
+ std::vector<int> buffered_indices_;
+
+ /// The number of bytes needed to encode the dictionary.
+ int dict_encoded_size_;
+};
+
+template <typename T>
+class DictEncoder : public DictEncoderBase {
+ public:
+ explicit DictEncoder(MemPool* pool = nullptr, int type_length = -1) :
+ DictEncoderBase(pool),
+ type_length_(type_length) { }
+
+ // TODO(wesm): think about how to address the construction semantics in
+ // encodings/dictionary-encoding.h
+ void set_mem_pool(MemPool* pool) {
+ pool_ = pool;
+ }
+
+ void set_type_length(int type_length) {
+ type_length_ = type_length;
+ }
+
+ /// Encode value. Note that this does not actually write any data, just
+ /// buffers the value's index to be written later.
+ void Put(const T& value);
+
+ virtual void WriteDict(uint8_t* buffer);
+
+ virtual int num_entries() const { return uniques_.size(); }
+
+ private:
+ // The unique observed values
+ std::vector<T> uniques_;
+
+ bool SlotDifferent(const T& v, hash_slot_t slot);
+ void DoubleTableSize();
+
+ /// Size of each encoded dictionary value. -1 for variable-length types.
+ int type_length_;
+
+ /// Hash function for mapping a value to a bucket.
+ inline uint32_t Hash(const T& value) const;
+
+ /// Adds value to the hash table and updates dict_encoded_size_
+ void AddDictKey(const T& value);
+};
+
+template<typename T>
+inline uint32_t DictEncoder<T>::Hash(const T& value) const {
+ return HashUtil::Hash(&value, sizeof(value), 0);
+}
+
+template<>
+inline uint32_t DictEncoder<ByteArray>::Hash(const ByteArray& value) const {
+ return HashUtil::Hash(value.ptr, value.len, 0);
+}
+
+template<>
+inline uint32_t DictEncoder<FixedLenByteArray>::Hash(
+ const FixedLenByteArray& value) const {
+ return HashUtil::Hash(value.ptr, type_length_, 0);
+}
+
+template <typename T>
+inline bool DictEncoder<T>::SlotDifferent(const T& v, hash_slot_t slot) {
+ return v != uniques_[slot];
+}
+
+template <>
+inline bool DictEncoder<FixedLenByteArray>::SlotDifferent(
+ const FixedLenByteArray& v, hash_slot_t slot) {
+ return 0 != memcmp(v.ptr, uniques_[slot].ptr, type_length_);
+}
+
+template <typename T>
+inline void DictEncoder<T>::Put(const T& v) {
+ uint32_t j = Hash(v) & mod_bitmask_;
+ hash_slot_t index = hash_slots_[j];
+
+ // Find an empty slot
+ while (HASH_SLOT_EMPTY != index && SlotDifferent(v, index)) {
+ // Linear probing
+ ++j;
+ if (j == hash_table_size_) j = 0;
+ index = hash_slots_[j];
+ }
+
+ int bytes_added = 0;
+ if (index == HASH_SLOT_EMPTY) {
+ // Not in the hash table, so we insert it now
+ index = uniques_.size();
+ hash_slots_[j] = index;
+ AddDictKey(v);
+
+ if (UNLIKELY(uniques_.size() >
+ static_cast<size_t>(hash_table_size_ * MAX_HASH_LOAD))) {
+ DoubleTableSize();
+ }
+ }
+
+ buffered_indices_.push_back(index);
+}
+
+template <typename T>
+inline void DictEncoder<T>::DoubleTableSize() {
+ int new_size = hash_table_size_ * 2;
+ std::vector<hash_slot_t> new_hash_slots(new_size, HASH_SLOT_EMPTY);
+ hash_slot_t index, slot;
+ uint32_t j;
+ for (int i = 0; i < hash_table_size_; ++i) {
+ index = hash_slots_[i];
+
+ if (index == HASH_SLOT_EMPTY) {
+ continue;
+ }
+
+ // Compute the hash value mod the new table size to start looking for an
+ // empty slot
+ const T& v = uniques_[index];
+
+ // Find an empty slot in the new hash table
+ j = Hash(v) & (new_size - 1);
+ slot = new_hash_slots[j];
+ while (HASH_SLOT_EMPTY != slot && SlotDifferent(v, slot)) {
+ ++j;
+ if (j == new_size) j = 0;
+ slot = new_hash_slots[j];
+ }
+
+ // Copy the old slot index to the new hash table
+ new_hash_slots[j] = index;
+ }
+
+ hash_table_size_ = new_size;
+ mod_bitmask_ = new_size - 1;
+ new_hash_slots.swap(hash_slots_);
+}
+
+template<typename T>
+inline void DictEncoder<T>::AddDictKey(const T& v) {
+ uniques_.push_back(v);
+ dict_encoded_size_ += sizeof(T);
+}
+
+template<>
+inline void DictEncoder<ByteArray>::AddDictKey(const ByteArray& v) {
+ uint8_t* heap = pool_->Allocate(v.len);
+ if (UNLIKELY(v.len > 0 && heap == nullptr)) {
+ throw ParquetException("out of memory");
+ }
+ memcpy(heap, v.ptr, v.len);
+
+ uniques_.push_back(ByteArray(v.len, heap));
+ dict_encoded_size_ += v.len + sizeof(uint32_t);
+}
+
+template<>
+inline void DictEncoder<FixedLenByteArray>::AddDictKey(const FixedLenByteArray& v) {
+ uint8_t* heap = pool_->Allocate(type_length_);
+ if (UNLIKELY(type_length_ > 0 && heap == nullptr)) {
+ throw ParquetException("out of memory");
+ }
+ memcpy(heap, v.ptr, type_length_);
+
+ uniques_.push_back(FixedLenByteArray(heap));
+ dict_encoded_size_ += type_length_;
+}
+
+template <typename T>
+inline void DictEncoder<T>::WriteDict(uint8_t* buffer) {
+ // For primitive types, only a memcpy
+ memcpy(buffer, &uniques_[0], sizeof(T) * uniques_.size());
+}
+
+// ByteArray and FLBA already have the dictionary encoded in their data heaps
+template <>
+inline void DictEncoder<ByteArray>::WriteDict(uint8_t* buffer) {
+ for (const ByteArray& v : uniques_) {
+ memcpy(buffer, reinterpret_cast<const void*>(&v.len), sizeof(uint32_t));
+ buffer += sizeof(uint32_t);
+ memcpy(buffer, v.ptr, v.len);
+ buffer += v.len;
+ }
+}
+
+template <>
+inline void DictEncoder<FixedLenByteArray>::WriteDict(uint8_t* buffer) {
+ for (const FixedLenByteArray& v : uniques_) {
+ memcpy(buffer, v.ptr, type_length_);
+ buffer += type_length_;
+ }
+}
+
+inline int DictEncoderBase::WriteIndices(uint8_t* buffer, int buffer_len) {
+ // Write bit width in first byte
+ *buffer = bit_width();
+ ++buffer;
+ --buffer_len;
+
+ RleEncoder encoder(buffer, buffer_len, bit_width());
+ for (int index : buffered_indices_) {
+ if (!encoder.Put(index)) return -1;
+ }
+ encoder.Flush();
+ return 1 + encoder.len();
+}
+
} // namespace parquet_cpp
#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/encoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h
index 50ba48f..ce91a29 100644
--- a/src/parquet/encodings/encoder.h
+++ b/src/parquet/encodings/encoder.h
@@ -39,11 +39,6 @@ class Encoder {
virtual ~Encoder() {}
- // Subclasses should override the ones they support
- virtual void Encode(const T* src, int num_values, OutputStream* dst) {
- throw ParquetException("Encoder does not implement this type.");
- }
-
const Encoding::type encoding() const { return encoding_; }
protected:
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc
new file mode 100644
index 0000000..10310ed
--- /dev/null
+++ b/src/parquet/encodings/encoding-test.cc
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <string>
+#include <vector>
+
+#include "parquet/schema/descriptor.h"
+#include "parquet/encodings/dictionary-encoding.h"
+#include "parquet/encodings/plain-encoding.h"
+#include "parquet/types.h"
+#include "parquet/schema/types.h"
+#include "parquet/util/bit-util.h"
+#include "parquet/util/buffer.h"
+#include "parquet/util/dict-encoding.h"
+#include "parquet/util/output.h"
+#include "parquet/util/test-common.h"
+
+using std::string;
+using std::vector;
+
+namespace parquet_cpp {
+
+namespace test {
+
+TEST(VectorBooleanTest, TestEncodeDecode) {
+ // PARQUET-454
+ int nvalues = 10000;
+ int nbytes = BitUtil::Ceil(nvalues, 8);
+
+ // seed the prng so failure is deterministic
+ vector<bool> draws = flip_coins_seed(nvalues, 0.5, 0);
+
+ PlainEncoder<Type::BOOLEAN> encoder(nullptr);
+ PlainDecoder<Type::BOOLEAN> decoder(nullptr);
+
+ InMemoryOutputStream dst;
+ encoder.Encode(draws, nvalues, &dst);
+
+ std::shared_ptr<Buffer> encode_buffer = dst.GetBuffer();
+ ASSERT_EQ(nbytes, encode_buffer->size());
+
+ vector<uint8_t> decode_buffer(nbytes);
+ const uint8_t* decode_data = &decode_buffer[0];
+
+ decoder.SetData(nvalues, encode_buffer->data(), encode_buffer->size());
+ int values_decoded = decoder.Decode(&decode_buffer[0], nvalues);
+ ASSERT_EQ(nvalues, values_decoded);
+
+ for (int i = 0; i < nvalues; ++i) {
+ ASSERT_EQ(draws[i], BitUtil::GetArrayBit(decode_data, i)) << i;
+ }
+}
+
+// ----------------------------------------------------------------------
+// test data generation
+
+template <typename T>
+void GenerateData(int num_values, T* out, vector<uint8_t>* heap) {
+ // seed the prng so failure is deterministic
+ random_numbers(num_values, 0, std::numeric_limits<T>::min(),
+ std::numeric_limits<T>::max(), out);
+}
+
+template <>
+void GenerateData<bool>(int num_values, bool* out, vector<uint8_t>* heap) {
+ // seed the prng so failure is deterministic
+ random_bools(num_values, 0.5, 0, out);
+}
+
+template <>
+void GenerateData<Int96>(int num_values, Int96* out, vector<uint8_t>* heap) {
+ // seed the prng so failure is deterministic
+ random_Int96_numbers(num_values, 0, std::numeric_limits<int32_t>::min(),
+ std::numeric_limits<int32_t>::max(), out);
+}
+
+template <>
+void GenerateData<ByteArray>(int num_values, ByteArray* out, vector<uint8_t>* heap) {
+ // seed the prng so failure is deterministic
+ int max_byte_array_len = 12;
+ int num_bytes = max_byte_array_len + sizeof(uint32_t);
+ heap->resize(num_values * max_byte_array_len);
+ random_byte_array(num_values, 0, heap->data(), out, 2, max_byte_array_len);
+}
+
+static int flba_length = 8;
+
+template <>
+void GenerateData<FLBA>(int num_values, FLBA* out, vector<uint8_t>* heap) {
+ // seed the prng so failure is deterministic
+ heap->resize(num_values * flba_length);
+ random_fixed_byte_array(num_values, 0, heap->data(), flba_length, out);
+}
+
+template <typename T>
+void VerifyResults(T* result, T* expected, int num_values) {
+ for (int i = 0; i < num_values; ++i) {
+ ASSERT_EQ(expected[i], result[i]) << i;
+ }
+}
+
+template <>
+void VerifyResults<FLBA>(FLBA* result, FLBA* expected, int num_values) {
+ for (int i = 0; i < num_values; ++i) {
+ ASSERT_EQ(0, memcmp(expected[i].ptr, result[i].ptr, flba_length)) << i;
+ }
+}
+
+// ----------------------------------------------------------------------
+// Create some column descriptors
+
+template <typename T>
+std::shared_ptr<ColumnDescriptor> ExampleDescr() {
+ return nullptr;
+}
+
+template <>
+std::shared_ptr<ColumnDescriptor> ExampleDescr<FLBA>() {
+ auto node = schema::PrimitiveNode::MakeFLBA("name", Repetition::OPTIONAL,
+ flba_length, LogicalType::UTF8);
+ return std::make_shared<ColumnDescriptor>(node, 0, 0);
+}
+
+// ----------------------------------------------------------------------
+// Plain encoding tests
+
+template <typename Type>
+class TestEncodingBase : public ::testing::Test {
+ public:
+ typedef typename Type::c_type T;
+ static constexpr int TYPE = Type::type_num;
+
+ void SetUp() {
+ descr_ = ExampleDescr<T>();
+ if (descr_) {
+ type_length_ = descr_->type_length();
+ }
+ }
+
+ void InitData(int nvalues, int repeats) {
+ num_values_ = nvalues * repeats;
+ input_bytes_.resize(num_values_ * sizeof(T));
+ output_bytes_.resize(num_values_ * sizeof(T));
+ draws_ = reinterpret_cast<T*>(input_bytes_.data());
+ decode_buf_ = reinterpret_cast<T*>(output_bytes_.data());
+ GenerateData<T>(nvalues, draws_, &data_buffer_);
+
+ // add some repeated values
+ for (int j = 1; j < repeats; ++j) {
+ for (int i = 0; i < nvalues; ++i) {
+ draws_[nvalues * j + i] = draws_[i];
+ }
+ }
+ }
+
+ virtual void CheckRoundtrip() = 0;
+
+ void Execute(int nvalues, int repeats) {
+ InitData(nvalues, repeats);
+ CheckRoundtrip();
+ }
+
+ protected:
+ MemPool pool_;
+
+ int num_values_;
+ int type_length_;
+ T* draws_;
+ T* decode_buf_;
+ vector<uint8_t> input_bytes_;
+ vector<uint8_t> output_bytes_;
+ vector<uint8_t> data_buffer_;
+
+ std::shared_ptr<Buffer> encode_buffer_;
+ std::shared_ptr<ColumnDescriptor> descr_;
+};
+
+// Member variables are not visible to templated subclasses. Possibly figure
+// out an alternative to this class layering at some point
+#define USING_BASE_MEMBERS() \
+ using TestEncodingBase<Type>::pool_; \
+ using TestEncodingBase<Type>::descr_; \
+ using TestEncodingBase<Type>::num_values_; \
+ using TestEncodingBase<Type>::draws_; \
+ using TestEncodingBase<Type>::data_buffer_; \
+ using TestEncodingBase<Type>::type_length_; \
+ using TestEncodingBase<Type>::encode_buffer_; \
+ using TestEncodingBase<Type>::decode_buf_;
+
+
+template <typename Type>
+class TestPlainEncoding : public TestEncodingBase<Type> {
+ public:
+ typedef typename Type::c_type T;
+ static constexpr int TYPE = Type::type_num;
+
+ virtual void CheckRoundtrip() {
+ PlainEncoder<TYPE> encoder(descr_.get());
+ PlainDecoder<TYPE> decoder(descr_.get());
+ InMemoryOutputStream dst;
+ encoder.Encode(draws_, num_values_, &dst);
+
+ encode_buffer_ = dst.GetBuffer();
+
+ decoder.SetData(num_values_, encode_buffer_->data(),
+ encode_buffer_->size());
+ int values_decoded = decoder.Decode(decode_buf_, num_values_);
+ ASSERT_EQ(num_values_, values_decoded);
+ VerifyResults<T>(decode_buf_, draws_, num_values_);
+ }
+
+ protected:
+ USING_BASE_MEMBERS();
+};
+
+TYPED_TEST_CASE(TestPlainEncoding, ParquetTypes);
+
+TYPED_TEST(TestPlainEncoding, BasicRoundTrip) {
+ this->Execute(10000, 1);
+}
+
+// ----------------------------------------------------------------------
+// Dictionary encoding tests
+
+typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
+ ByteArrayType, FLBAType> DictEncodedTypes;
+
+template <typename Type>
+class TestDictionaryEncoding : public TestEncodingBase<Type> {
+ public:
+ typedef typename Type::c_type T;
+ static constexpr int TYPE = Type::type_num;
+
+ void CheckRoundtrip() {
+ DictEncoder<T> encoder(&pool_, type_length_);
+
+ dict_buffer_ = std::make_shared<OwnedMutableBuffer>();
+ auto indices = std::make_shared<OwnedMutableBuffer>();
+
+ ASSERT_NO_THROW(
+ {
+ for (int i = 0; i < num_values_; ++i) {
+ encoder.Put(draws_[i]);
+ }
+ });
+ dict_buffer_->Resize(encoder.dict_encoded_size());
+ encoder.WriteDict(dict_buffer_->mutable_data());
+
+ indices->Resize(encoder.EstimatedDataEncodedSize());
+ int actual_bytes = encoder.WriteIndices(indices->mutable_data(),
+ indices->size());
+ indices->Resize(actual_bytes);
+
+ PlainDecoder<TYPE> dict_decoder(descr_.get());
+ dict_decoder.SetData(encoder.num_entries(), dict_buffer_->data(),
+ dict_buffer_->size());
+
+ DictionaryDecoder<TYPE> decoder(descr_.get());
+ decoder.SetDict(&dict_decoder);
+
+ decoder.SetData(num_values_, indices->data(), indices->size());
+ int values_decoded = decoder.Decode(decode_buf_, num_values_);
+ ASSERT_EQ(num_values_, values_decoded);
+
+ // TODO(wesm): The DictionaryDecoder must stay alive because the decoded
+ // values' data is owned by a buffer inside the DictionaryEncoder. We
+ // should revisit when data lifetime is reviewed more generally.
+ VerifyResults<T>(decode_buf_, draws_, num_values_);
+ }
+
+ protected:
+ USING_BASE_MEMBERS();
+ std::shared_ptr<OwnedMutableBuffer> dict_buffer_;
+};
+
+TYPED_TEST_CASE(TestDictionaryEncoding, DictEncodedTypes);
+
+TYPED_TEST(TestDictionaryEncoding, BasicRoundTrip) {
+ this->Execute(2500, 2);
+}
+
+TEST(TestDictionaryEncoding, CannotDictDecodeBoolean) {
+ PlainDecoder<Type::BOOLEAN> dict_decoder(nullptr);
+ DictionaryDecoder<Type::BOOLEAN> decoder(nullptr);
+
+ ASSERT_THROW(decoder.SetDict(&dict_decoder), ParquetException);
+}
+
+} // namespace test
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/plain-encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc
deleted file mode 100644
index 7ebd21f..0000000
--- a/src/parquet/encodings/plain-encoding-test.cc
+++ /dev/null
@@ -1,232 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdint>
-#include <cstdlib>
-#include <cstring>
-#include <string>
-#include <vector>
-
-#include <gtest/gtest.h>
-
-#include "parquet/schema/descriptor.h"
-#include "parquet/encodings/plain-encoding.h"
-#include "parquet/types.h"
-#include "parquet/schema/types.h"
-#include "parquet/util/bit-util.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/output.h"
-#include "parquet/util/test-common.h"
-
-using std::string;
-using std::vector;
-
-namespace parquet_cpp {
-
-namespace test {
-
-TEST(VectorBooleanTest, TestEncodeDecode) {
- // PARQUET-454
- size_t nvalues = 10000;
- size_t nbytes = BitUtil::Ceil(nvalues, 8);
-
- // seed the prng so failure is deterministic
- vector<bool> draws = flip_coins_seed(nvalues, 0.5, 0);
-
- PlainEncoder<Type::BOOLEAN> encoder(nullptr);
- PlainDecoder<Type::BOOLEAN> decoder(nullptr);
-
- InMemoryOutputStream dst;
- encoder.Encode(draws, nvalues, &dst);
-
- std::shared_ptr<Buffer> encode_buffer = dst.GetBuffer();
- ASSERT_EQ(nbytes, encode_buffer->size());
-
- vector<uint8_t> decode_buffer(nbytes);
- const uint8_t* decode_data = &decode_buffer[0];
-
- decoder.SetData(nvalues, encode_buffer->data(), encode_buffer->size());
- size_t values_decoded = decoder.Decode(&decode_buffer[0], nvalues);
- ASSERT_EQ(nvalues, values_decoded);
-
- for (size_t i = 0; i < nvalues; ++i) {
- ASSERT_EQ(draws[i], BitUtil::GetArrayBit(decode_data, i)) << i;
- }
-}
-
-template<typename T, int TYPE>
-class EncodeDecode{
- public:
- void init_data(int nvalues) {
- num_values_ = nvalues;
- input_bytes_.resize(num_values_ * sizeof(T));
- output_bytes_.resize(num_values_ * sizeof(T));
- draws_ = reinterpret_cast<T*>(input_bytes_.data());
- decode_buf_ = reinterpret_cast<T*>(output_bytes_.data());
- }
-
- void generate_data() {
- // seed the prng so failure is deterministic
- random_numbers(num_values_, 0, std::numeric_limits<T>::min(),
- std::numeric_limits<T>::max(), draws_);
- }
-
- void encode_decode(ColumnDescriptor *d) {
- PlainEncoder<TYPE> encoder(d);
- PlainDecoder<TYPE> decoder(d);
-
- InMemoryOutputStream dst;
- encoder.Encode(draws_, num_values_, &dst);
-
- encode_buffer_ = dst.GetBuffer();
-
- decoder.SetData(num_values_, encode_buffer_->data(),
- encode_buffer_->size());
- size_t values_decoded = decoder.Decode(decode_buf_, num_values_);
- ASSERT_EQ(num_values_, values_decoded);
- }
-
- void verify_results() {
- for (size_t i = 0; i < num_values_; ++i) {
- ASSERT_EQ(draws_[i], decode_buf_[i]) << i;
- }
- }
-
- void execute(int nvalues, ColumnDescriptor *d) {
- init_data(nvalues);
- generate_data();
- encode_decode(d);
- verify_results();
- }
-
- private:
- int num_values_;
- T* draws_;
- T* decode_buf_;
- vector<uint8_t> input_bytes_;
- vector<uint8_t> output_bytes_;
- vector<uint8_t> data_buffer_;
-
- std::shared_ptr<Buffer> encode_buffer_;
-};
-
-template<>
-void EncodeDecode<bool, Type::BOOLEAN>::generate_data() {
- // seed the prng so failure is deterministic
- random_bools(num_values_, 0.5, 0, draws_);
-}
-
-template<>
-void EncodeDecode<Int96, Type::INT96>::generate_data() {
- // seed the prng so failure is deterministic
- random_Int96_numbers(num_values_, 0, std::numeric_limits<int32_t>::min(),
- std::numeric_limits<int32_t>::max(), draws_);
-}
-
-template<>
-void EncodeDecode<Int96, Type::INT96>::verify_results() {
- for (size_t i = 0; i < num_values_; ++i) {
- ASSERT_EQ(draws_[i].value[0], decode_buf_[i].value[0]) << i;
- ASSERT_EQ(draws_[i].value[1], decode_buf_[i].value[1]) << i;
- ASSERT_EQ(draws_[i].value[2], decode_buf_[i].value[2]) << i;
- }
-}
-
-template<>
-void EncodeDecode<ByteArray, Type::BYTE_ARRAY>::generate_data() {
- // seed the prng so failure is deterministic
- int max_byte_array_len = 12;
- int num_bytes = max_byte_array_len + sizeof(uint32_t);
- size_t nbytes = num_values_ * num_bytes;
- data_buffer_.resize(nbytes);
- random_byte_array(num_values_, 0, data_buffer_.data(), draws_,
- max_byte_array_len);
-}
-
-template<>
-void EncodeDecode<ByteArray, Type::BYTE_ARRAY>::verify_results() {
- for (size_t i = 0; i < num_values_; ++i) {
- ASSERT_EQ(draws_[i].len, decode_buf_[i].len) << i;
- ASSERT_EQ(0, memcmp(draws_[i].ptr, decode_buf_[i].ptr, draws_[i].len)) << i;
- }
-}
-
-static int flba_length = 8;
-template<>
-void EncodeDecode<FLBA, Type::FIXED_LEN_BYTE_ARRAY>::generate_data() {
- // seed the prng so failure is deterministic
- size_t nbytes = num_values_ * flba_length;
- data_buffer_.resize(nbytes);
- ASSERT_EQ(nbytes, data_buffer_.size());
- random_fixed_byte_array(num_values_, 0, data_buffer_.data(), flba_length, draws_);
-}
-
-template<>
-void EncodeDecode<FLBA, Type::FIXED_LEN_BYTE_ARRAY>::verify_results() {
- for (size_t i = 0; i < num_values_; ++i) {
- ASSERT_EQ(0, memcmp(draws_[i].ptr, decode_buf_[i].ptr, flba_length)) << i;
- }
-}
-
-int num_values = 10000;
-
-TEST(BoolEncodeDecode, TestEncodeDecode) {
- EncodeDecode<bool, Type::BOOLEAN> obj;
- obj.execute(num_values, nullptr);
-}
-
-TEST(Int32EncodeDecode, TestEncodeDecode) {
- EncodeDecode<int32_t, Type::INT32> obj;
- obj.execute(num_values, nullptr);
-}
-
-TEST(Int64EncodeDecode, TestEncodeDecode) {
- EncodeDecode<int64_t, Type::INT64> obj;
- obj.execute(num_values, nullptr);
-}
-
-TEST(FloatEncodeDecode, TestEncodeDecode) {
- EncodeDecode<float, Type::FLOAT> obj;
- obj.execute(num_values, nullptr);
-}
-
-TEST(DoubleEncodeDecode, TestEncodeDecode) {
- EncodeDecode<double, Type::DOUBLE> obj;
- obj.execute(num_values, nullptr);
-}
-
-TEST(Int96EncodeDecode, TestEncodeDecode) {
- EncodeDecode<Int96, Type::INT96> obj;
- obj.execute(num_values, nullptr);
-}
-
-TEST(BAEncodeDecode, TestEncodeDecode) {
- EncodeDecode<ByteArray, Type::BYTE_ARRAY> obj;
- obj.execute(num_values, nullptr);
-}
-
-TEST(FLBAEncodeDecode, TestEncodeDecode) {
- schema::NodePtr node;
- node = schema::PrimitiveNode::MakeFLBA("name", Repetition::OPTIONAL,
- flba_length, LogicalType::UTF8);
- ColumnDescriptor d(node, 0, 0);
- EncodeDecode<FixedLenByteArray, Type::FIXED_LEN_BYTE_ARRAY> obj;
- obj.execute(num_values, &d);
-}
-
-} // namespace test
-} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index 83ee40c..9adabdf 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -40,7 +40,13 @@ class PlainDecoder : public Decoder<TYPE> {
explicit PlainDecoder(const ColumnDescriptor* descr) :
Decoder<TYPE>(descr, Encoding::PLAIN),
- data_(NULL), len_(0) {}
+ data_(NULL), len_(0) {
+ if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
+ type_length_ = descr_->type_length();
+ } else {
+ type_length_ = -1;
+ }
+ }
virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
@@ -49,55 +55,69 @@ class PlainDecoder : public Decoder<TYPE> {
}
virtual int Decode(T* buffer, int max_values);
+
private:
+ using Decoder<TYPE>::descr_;
const uint8_t* data_;
int len_;
+ int type_length_;
};
-template <int TYPE>
-inline int PlainDecoder<TYPE>::Decode(T* buffer, int max_values) {
- max_values = std::min(max_values, num_values_);
- int size = max_values * sizeof(T);
- if (len_ < size) ParquetException::EofException();
- memcpy(buffer, data_, size);
- data_ += size;
- len_ -= size;
- num_values_ -= max_values;
- return max_values;
+// Decode routine templated on C++ type rather than type enum
+template <typename T>
+inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values,
+ int type_length, T* out) {
+ int bytes_to_decode = num_values * sizeof(T);
+ if (data_size < bytes_to_decode) {
+ ParquetException::EofException();
+ }
+ memcpy(out, data, bytes_to_decode);
+ return bytes_to_decode;
}
-// Template specialization for BYTE_ARRAY
-// BA does not currently own its data
-// the lifetime is tied to the input stream
+// Template specialization for BYTE_ARRAY. The written values do not own their
+// own data.
template <>
-inline int PlainDecoder<Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
- int max_values) {
- max_values = std::min(max_values, num_values_);
- for (int i = 0; i < max_values; ++i) {
- uint32_t len = buffer[i].len = *reinterpret_cast<const uint32_t*>(data_);
- if (len_ < sizeof(uint32_t) + len) ParquetException::EofException();
- buffer[i].ptr = data_ + sizeof(uint32_t);
- data_ += sizeof(uint32_t) + len;
- len_ -= sizeof(uint32_t) + len;
+inline int DecodePlain<ByteArray>(const uint8_t* data, int64_t data_size, int num_values,
+ int type_length, ByteArray* out) {
+ int bytes_decoded = 0;
+ int increment;
+ for (int i = 0; i < num_values; ++i) {
+ uint32_t len = out[i].len = *reinterpret_cast<const uint32_t*>(data);
+ increment = sizeof(uint32_t) + len;
+ if (data_size < increment) ParquetException::EofException();
+ out[i].ptr = data + sizeof(uint32_t);
+ data += increment;
+ data_size -= increment;
+ bytes_decoded += increment;
}
- num_values_ -= max_values;
- return max_values;
+ return bytes_decoded;
}
-// Template specialization for FIXED_LEN_BYTE_ARRAY
-// FLBA does not currently own its data
-// the lifetime is tied to the input stream
+// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not
+// own their own data.
template <>
-inline int PlainDecoder<Type::FIXED_LEN_BYTE_ARRAY>::Decode(
- FixedLenByteArray* buffer, int max_values) {
- max_values = std::min(max_values, num_values_);
- int len = descr_->type_length();
- for (int i = 0; i < max_values; ++i) {
- if (len_ < len) ParquetException::EofException();
- buffer[i].ptr = data_;
- data_ += len;
- len_ -= len;
+inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size,
+ int num_values, int type_length, FixedLenByteArray* out) {
+ int bytes_to_decode = type_length * num_values;
+ if (data_size < bytes_to_decode) {
+ ParquetException::EofException();
}
+ for (int i = 0; i < num_values; ++i) {
+ out[i].ptr = data;
+ data += type_length;
+ data_size -= type_length;
+ }
+ return bytes_to_decode;
+}
+
+template <int TYPE>
+inline int PlainDecoder<TYPE>::Decode(T* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ int bytes_consumed = DecodePlain<T>(data_, len_, max_values,
+ type_length_, buffer);
+ data_ += bytes_consumed;
+ len_ -= bytes_consumed;
num_values_ -= max_values;
return max_values;
}
@@ -155,7 +175,7 @@ class PlainEncoder : public Encoder<TYPE> {
explicit PlainEncoder(const ColumnDescriptor* descr) :
Encoder<TYPE>(descr, Encoding::PLAIN) {}
- virtual void Encode(const T* src, int num_values, OutputStream* dst);
+ void Encode(const T* src, int num_values, OutputStream* dst);
};
template <>
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index 7aff74a..08d4607 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -31,8 +31,6 @@
namespace parquet_cpp {
-class SchemaDescriptor;
-
// 16 MB is the default maximum page header size
static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index be22e5a..e99140c 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -15,14 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+#include <gtest/gtest.h>
#include <cstdlib>
#include <cstdint>
#include <iostream>
#include <memory>
#include <string>
-#include <gtest/gtest.h>
-
#include "parquet/file/reader.h"
#include "parquet/column/reader.h"
#include "parquet/column/scanner.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/schema/schema-descriptor-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc
index 7677615..3b1734b 100644
--- a/src/parquet/schema/schema-descriptor-test.cc
+++ b/src/parquet/schema/schema-descriptor-test.cc
@@ -17,13 +17,12 @@
// Schema / column descriptor correctness tests (from flat Parquet schemas)
+#include <gtest/gtest.h>
#include <cstdint>
#include <cstdlib>
#include <string>
#include <vector>
-#include <gtest/gtest.h>
-
#include "parquet/exception.h"
#include "parquet/schema/descriptor.h"
#include "parquet/schema/types.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/types.h
----------------------------------------------------------------------
diff --git a/src/parquet/types.h b/src/parquet/types.h
index 8c5e123..f59f6a9 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -128,11 +128,24 @@ struct PageType {
// ----------------------------------------------------------------------
struct ByteArray {
+ ByteArray() {}
+ ByteArray(uint32_t len, const uint8_t* ptr) : len(len), ptr(ptr) {}
uint32_t len;
const uint8_t* ptr;
+
+ bool operator==(const ByteArray& other) const {
+ return this->len == other.len &&
+ 0 == memcmp(this->ptr, other.ptr, this->len);
+ }
+
+ bool operator!=(const ByteArray& other) const {
+ return this->len != other.len || 0 != memcmp(this->ptr, other.ptr, this->len);
+ }
};
struct FixedLenByteArray {
+ FixedLenByteArray() {}
+ explicit FixedLenByteArray(const uint8_t* ptr) : ptr(ptr) {}
const uint8_t* ptr;
};
@@ -140,6 +153,14 @@ typedef FixedLenByteArray FLBA;
MANUALLY_ALIGNED_STRUCT(1) Int96 {
uint32_t value[3];
+
+ bool operator==(const Int96& other) const {
+ return 0 == memcmp(this->value, other.value, 3 * sizeof(uint32_t));
+ }
+
+ bool operator!=(const Int96& other) const {
+ return !(*this == other);
+ }
};
STRUCT_END(Int96, 12);
@@ -241,6 +262,21 @@ struct type_traits<Type::FIXED_LEN_BYTE_ARRAY> {
static constexpr const char* printf_code = "s";
};
+template <Type::type TYPE>
+struct DataType {
+ static constexpr Type::type type_num = TYPE;
+ typedef typename type_traits<TYPE>::value_type c_type;
+};
+
+typedef DataType<Type::BOOLEAN> BooleanType;
+typedef DataType<Type::INT32> Int32Type;
+typedef DataType<Type::INT64> Int64Type;
+typedef DataType<Type::INT96> Int96Type;
+typedef DataType<Type::FLOAT> FloatType;
+typedef DataType<Type::DOUBLE> DoubleType;
+typedef DataType<Type::BYTE_ARRAY> ByteArrayType;
+typedef DataType<Type::FIXED_LEN_BYTE_ARRAY> FLBAType;
+
template <int TYPE>
inline std::string format_fwf(int width) {
std::stringstream ss;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt
index 2b782fc..a009129 100644
--- a/src/parquet/util/CMakeLists.txt
+++ b/src/parquet/util/CMakeLists.txt
@@ -20,22 +20,27 @@ install(FILES
bit-stream-utils.h
bit-stream-utils.inline.h
bit-util.h
- cpu-info.h
- sse-info.h
+ buffer-builder.h
compiler-util.h
+ cpu-info.h
+ dict-encoding.h
+ hash-util.h
+ input.h
logging.h
macros.h
+ mem-pool.h
+ output.h
rle-encoding.h
stopwatch.h
- input.h
- output.h
+ sse-info.h
DESTINATION include/parquet/util)
add_library(parquet_util STATIC
buffer.cc
+ cpu-info.cc
input.cc
+ mem-pool.cc
output.cc
- cpu-info.cc
)
if(PARQUET_BUILD_TESTS)
@@ -58,5 +63,6 @@ endif()
ADD_PARQUET_TEST(bit-util-test)
ADD_PARQUET_TEST(buffer-test)
+ADD_PARQUET_TEST(mem-pool-test)
ADD_PARQUET_TEST(output-test)
ADD_PARQUET_TEST(rle-test)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-stream-utils.h b/src/parquet/util/bit-stream-utils.h
index b93b90e..3636128 100644
--- a/src/parquet/util/bit-stream-utils.h
+++ b/src/parquet/util/bit-stream-utils.h
@@ -20,9 +20,9 @@
#ifndef PARQUET_UTIL_BIT_STREAM_UTILS_H
#define PARQUET_UTIL_BIT_STREAM_UTILS_H
+#include <string.h>
#include <algorithm>
#include <cstdint>
-#include <string.h>
#include "parquet/util/compiler-util.h"
#include "parquet/util/logging.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-util-test.cc b/src/parquet/util/bit-util-test.cc
index a8b6be0..5ea4c11 100644
--- a/src/parquet/util/bit-util-test.cc
+++ b/src/parquet/util/bit-util-test.cc
@@ -19,11 +19,12 @@
#include <stdlib.h>
#include <stdio.h>
-#include <iostream>
#include <limits.h>
+#include <gtest/gtest.h>
#include <boost/utility.hpp>
-#include <gtest/gtest.h>
+
+#include <iostream>
#include "parquet/util/bit-util.h"
#include "parquet/util/bit-stream-utils.inline.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/buffer-builder.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer-builder.h b/src/parquet/util/buffer-builder.h
new file mode 100644
index 0000000..6fab6c5
--- /dev/null
+++ b/src/parquet/util/buffer-builder.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Initially imported from Impala on 2016-02-23
+
+#ifndef PARQUET_UTIL_BUFFER_BUILDER_H
+#define PARQUET_UTIL_BUFFER_BUILDER_H
+
+#include <stdlib.h>
+#include <cstdint>
+
+namespace parquet_cpp {
+
+/// Utility class to build an in-memory buffer.
+class BufferBuilder {
+ public:
+ BufferBuilder(uint8_t* dst_buffer, int dst_len)
+ : buffer_(dst_buffer), capacity_(dst_len), size_(0) {
+ }
+
+ BufferBuilder(char* dst_buffer, int dst_len)
+ : buffer_(reinterpret_cast<uint8_t*>(dst_buffer)),
+ capacity_(dst_len), size_(0) {
+ }
+
+ inline void Append(const void* buffer, int len) {
+ memcpy(buffer_ + size_, buffer, len);
+ size_ += len;
+ }
+
+ template<typename T>
+ inline void Append(const T& v) {
+ Append(&v, sizeof(T));
+ }
+
+ int capacity() const { return capacity_; }
+ int size() const { return size_; }
+
+ private:
+ uint8_t* buffer_;
+ int capacity_;
+ int size_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_BUFFER_BUILDER_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/cpu-info.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/cpu-info.cc b/src/parquet/util/cpu-info.cc
index 610fb62..2a9f59d 100644
--- a/src/parquet/util/cpu-info.cc
+++ b/src/parquet/util/cpu-info.cc
@@ -24,16 +24,18 @@
#include <sys/sysctl.h>
#endif
+#include <mmintrin.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
#include <boost/algorithm/string.hpp>
+
#include <algorithm>
#include <cstdint>
#include <iostream>
#include <fstream>
-#include <mmintrin.h>
#include <sstream>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
#include <string>
#include "parquet/exception.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/dict-encoding.h b/src/parquet/util/dict-encoding.h
new file mode 100644
index 0000000..315b88e
--- /dev/null
+++ b/src/parquet/util/dict-encoding.h
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_UTIL_DICT_ENCODING_H
+#define PARQUET_UTIL_DICT_ENCODING_H
+
+#include <algorithm>
+#include <cstdint>
+#include <limits>
+#include <vector>
+
+#include "parquet/types.h"
+#include "parquet/encodings/plain-encoding.h"
+#include "parquet/util/hash-util.h"
+#include "parquet/util/mem-pool.h"
+#include "parquet/util/rle-encoding.h"
+
+namespace parquet_cpp {
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_DICT_ENCODING_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/hash-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/hash-util.h b/src/parquet/util/hash-util.h
new file mode 100644
index 0000000..5572ca9
--- /dev/null
+++ b/src/parquet/util/hash-util.h
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// From Apache Impala as of 2016-02-22
+
+#ifndef PARQUET_UTIL_HASH_UTIL_H
+#define PARQUET_UTIL_HASH_UTIL_H
+
+#include <cstdint>
+
+#include "parquet/util/compiler-util.h"
+#include "parquet/util/cpu-info.h"
+#include "parquet/util/logging.h"
+#include "parquet/util/sse-util.h"
+
+namespace parquet_cpp {
+
+/// Utility class to compute hash values.
+class HashUtil {
+ public:
+ /// Compute the Crc32 hash for data using SSE4 instructions. The input hash
+ /// parameter is the current hash/seed value.
+ /// This should only be called if SSE is supported.
+ /// This is ~4x faster than Fnv/Boost Hash.
+ /// TODO: crc32 hashes with different seeds do not result in different hash functions.
+ /// The resulting hashes are correlated.
+ /// TODO: update this to also use SSE4_crc32_u64 and SSE4_crc32_u16 where appropriate.
+ static uint32_t CrcHash(const void* data, int32_t bytes, uint32_t hash) {
+ DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+ uint32_t words = bytes / sizeof(uint32_t);
+ bytes = bytes % sizeof(uint32_t);
+
+ const uint32_t* p = reinterpret_cast<const uint32_t*>(data);
+ while (words--) {
+ hash = SSE4_crc32_u32(hash, *p);
+ ++p;
+ }
+
+ const uint8_t* s = reinterpret_cast<const uint8_t*>(p);
+ while (bytes--) {
+ hash = SSE4_crc32_u8(hash, *s);
+ ++s;
+ }
+
+ // The lower half of the CRC hash has has poor uniformity, so swap the halves
+ // for anyone who only uses the first several bits of the hash.
+ hash = (hash << 16) | (hash >> 16);
+ return hash;
+ }
+
+ /// CrcHash() specialized for 1-byte data
+ static inline uint32_t CrcHash1(const void* v, uint32_t hash) {
+ DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+ const uint8_t* s = reinterpret_cast<const uint8_t*>(v);
+ hash = SSE4_crc32_u8(hash, *s);
+ hash = (hash << 16) | (hash >> 16);
+ return hash;
+ }
+
+ /// CrcHash() specialized for 2-byte data
+ static inline uint32_t CrcHash2(const void* v, uint32_t hash) {
+ DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+ const uint16_t* s = reinterpret_cast<const uint16_t*>(v);
+ hash = SSE4_crc32_u16(hash, *s);
+ hash = (hash << 16) | (hash >> 16);
+ return hash;
+ }
+
+ /// CrcHash() specialized for 4-byte data
+ static inline uint32_t CrcHash4(const void* v, uint32_t hash) {
+ DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+ const uint32_t* p = reinterpret_cast<const uint32_t*>(v);
+ hash = SSE4_crc32_u32(hash, *p);
+ hash = (hash << 16) | (hash >> 16);
+ return hash;
+ }
+
+ /// CrcHash() specialized for 8-byte data
+ static inline uint32_t CrcHash8(const void* v, uint32_t hash) {
+ DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+ const uint64_t* p = reinterpret_cast<const uint64_t*>(v);
+ hash = SSE4_crc32_u64(hash, *p);
+ hash = (hash << 16) | (hash >> 16);
+ return hash;
+ }
+
+ /// CrcHash() specialized for 12-byte data
+ static inline uint32_t CrcHash12(const void* v, uint32_t hash) {
+ DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+ const uint64_t* p = reinterpret_cast<const uint64_t*>(v);
+ hash = SSE4_crc32_u64(hash, *p);
+ ++p;
+ hash = SSE4_crc32_u32(hash, *reinterpret_cast<const uint32_t *>(p));
+ hash = (hash << 16) | (hash >> 16);
+ return hash;
+ }
+
+ /// CrcHash() specialized for 16-byte data
+ static inline uint32_t CrcHash16(const void* v, uint32_t hash) {
+ DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+ const uint64_t* p = reinterpret_cast<const uint64_t*>(v);
+ hash = SSE4_crc32_u64(hash, *p);
+ ++p;
+ hash = SSE4_crc32_u64(hash, *p);
+ hash = (hash << 16) | (hash >> 16);
+ return hash;
+ }
+
+ static const uint64_t MURMUR_PRIME = 0xc6a4a7935bd1e995;
+ static const int MURMUR_R = 47;
+
+ /// Murmur2 hash implementation returning 64-bit hashes.
+ static uint64_t MurmurHash2_64(const void* input, int len, uint64_t seed) {
+ uint64_t h = seed ^ (len * MURMUR_PRIME);
+
+ const uint64_t* data = reinterpret_cast<const uint64_t*>(input);
+ const uint64_t* end = data + (len / sizeof(uint64_t));
+
+ while (data != end) {
+ uint64_t k = *data++;
+ k *= MURMUR_PRIME;
+ k ^= k >> MURMUR_R;
+ k *= MURMUR_PRIME;
+ h ^= k;
+ h *= MURMUR_PRIME;
+ }
+
+ const uint8_t* data2 = reinterpret_cast<const uint8_t*>(data);
+ switch (len & 7) {
+ case 7: h ^= uint64_t(data2[6]) << 48;
+ case 6: h ^= uint64_t(data2[5]) << 40;
+ case 5: h ^= uint64_t(data2[4]) << 32;
+ case 4: h ^= uint64_t(data2[3]) << 24;
+ case 3: h ^= uint64_t(data2[2]) << 16;
+ case 2: h ^= uint64_t(data2[1]) << 8;
+ case 1: h ^= uint64_t(data2[0]);
+ h *= MURMUR_PRIME;
+ }
+
+ h ^= h >> MURMUR_R;
+ h *= MURMUR_PRIME;
+ h ^= h >> MURMUR_R;
+ return h;
+ }
+
+ /// default values recommended by http://isthe.com/chongo/tech/comp/fnv/
+ static const uint32_t FNV_PRIME = 0x01000193; // 16777619
+ static const uint32_t FNV_SEED = 0x811C9DC5; // 2166136261
+ static const uint64_t FNV64_PRIME = 1099511628211UL;
+ static const uint64_t FNV64_SEED = 14695981039346656037UL;
+
+ /// Implementation of the Fowler-Noll-Vo hash function. This is not as performant
+ /// as boost's hash on int types (2x slower) but has bit entropy.
+ /// For ints, boost just returns the value of the int which can be pathological.
+ /// For example, if the data is <1000, 2000, 3000, 4000, ..> and then the mod of 1000
+ /// is taken on the hash, all values will collide to the same bucket.
+ /// For string values, Fnv is slightly faster than boost.
+ /// IMPORTANT: FNV hash suffers from poor diffusion of the least significant bit,
+ /// which can lead to poor results when input bytes are duplicated.
+ /// See FnvHash64to32() for how this can be mitigated.
+ static uint64_t FnvHash64(const void* data, int32_t bytes, uint64_t hash) {
+ const uint8_t* ptr = reinterpret_cast<const uint8_t*>(data);
+ while (bytes--) {
+ hash = (*ptr ^ hash) * FNV64_PRIME;
+ ++ptr;
+ }
+ return hash;
+ }
+
+ /// Return a 32-bit hash computed by invoking FNV-64 and folding the result to 32-bits.
+ /// This technique is recommended instead of FNV-32 since the LSB of an FNV hash is the
+ /// XOR of the LSBs of its input bytes, leading to poor results for duplicate inputs.
+ /// The input seed 'hash' is duplicated so the top half of the seed is not all zero.
+ /// Data length must be at least 1 byte: zero-length data should be handled separately,
+ /// for example using CombineHash with a unique constant value to avoid returning the
+ /// hash argument. Zero-length data gives terrible results: the initial hash value is
+ /// xored with itself cancelling all bits.
+ static uint32_t FnvHash64to32(const void* data, int32_t bytes, uint32_t hash) {
+ // IMPALA-2270: this function should never be used for zero-byte inputs.
+ DCHECK_GT(bytes, 0);
+ uint64_t hash_u64 = hash | ((uint64_t)hash << 32);
+ hash_u64 = FnvHash64(data, bytes, hash_u64);
+ return (hash_u64 >> 32) ^ (hash_u64 & 0xFFFFFFFF);
+ }
+
+ /// Computes the hash value for data. Will call either CrcHash or MurmurHash
+ /// depending on hardware capabilities.
+ /// Seed values for different steps of the query execution should use different seeds
+ /// to prevent accidental key collisions. (See IMPALA-219 for more details).
+ static uint32_t Hash(const void* data, int32_t bytes, uint32_t seed) {
+ if (LIKELY(CpuInfo::IsSupported(CpuInfo::SSE4_2))) {
+ return CrcHash(data, bytes, seed);
+ } else {
+ return MurmurHash2_64(data, bytes, seed);
+ }
+ }
+
+ /// The magic number (used in hash_combine()) 0x9e3779b9 = 2^32 / (golden ratio).
+ static const uint32_t HASH_COMBINE_SEED = 0x9e3779b9;
+
+ /// Combine hashes 'value' and 'seed' to get a new hash value. Similar to
+ /// boost::hash_combine(), but for uint32_t. This function should be used with a
+ /// constant first argument to update the hash value for zero-length values such as
+ /// NULL, boolean, and empty strings.
+ static inline uint32_t HashCombine32(uint32_t value, uint32_t seed) {
+ return seed ^ (HASH_COMBINE_SEED + value + (seed << 6) + (seed >> 2));
+ }
+
+ // Get 32 more bits of randomness from a 32-bit hash:
+ static inline uint32_t Rehash32to32(const uint32_t hash) {
+ // Constants generated by uuidgen(1) with the -r flag
+ static const uint64_t m = 0x7850f11ec6d14889ull, a = 0x6773610597ca4c63ull;
+ // This is strongly universal hashing following Dietzfelbinger's "Universal hashing
+ // and k-wise independent random variables via integer arithmetic without primes". As
+ // such, for any two distinct uint32_t's hash1 and hash2, the probability (over the
+ // randomness of the constants) that any subset of bit positions of
+ // Rehash32to32(hash1) is equal to the same subset of bit positions
+ // Rehash32to32(hash2) is minimal.
+ return (static_cast<uint64_t>(hash) * m + a) >> 32;
+ }
+
+ static inline uint64_t Rehash32to64(const uint32_t hash) {
+ static const uint64_t m1 = 0x47b6137a44974d91ull, m2 = 0x8824ad5ba2b7289cull,
+ a1 = 0x705495c62df1424aull, a2 = 0x9efc49475c6bfb31ull;
+ const uint64_t hash1 = (static_cast<uint64_t>(hash) * m1 + a1) >> 32;
+ const uint64_t hash2 = (static_cast<uint64_t>(hash) * m2 + a2) >> 32;
+ return hash1 | (hash2 << 32);
+ }
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_HASH_UTIL_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool-test.cc b/src/parquet/util/mem-pool-test.cc
new file mode 100644
index 0000000..de0b399
--- /dev/null
+++ b/src/parquet/util/mem-pool-test.cc
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Initially imported from Apache Impala on 2016-02-23, and has been modified
+// since for parquet-cpp
+
+#include <gtest/gtest.h>
+#include <cstdint>
+#include <limits>
+#include <string>
+
+#include "parquet/util/mem-pool.h"
+#include "parquet/util/bit-util.h"
+
+namespace parquet_cpp {
+
+// Utility class to call private functions on MemPool.
+class MemPoolTest {
+ public:
+ static bool CheckIntegrity(MemPool* pool, bool current_chunk_empty) {
+ return pool->CheckIntegrity(current_chunk_empty);
+ }
+
+ static const int INITIAL_CHUNK_SIZE = MemPool::INITIAL_CHUNK_SIZE;
+ static const int MAX_CHUNK_SIZE = MemPool::MAX_CHUNK_SIZE;
+};
+
+const int MemPoolTest::INITIAL_CHUNK_SIZE;
+const int MemPoolTest::MAX_CHUNK_SIZE;
+
+TEST(MemPoolTest, Basic) {
+ MemPool p;
+ MemPool p2;
+ MemPool p3;
+
+ for (int iter = 0; iter < 2; ++iter) {
+ // allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes)
+ for (int i = 0; i < 768; ++i) {
+ // pads to 32 bytes
+ p.Allocate(25);
+ }
+ // we handed back 24K
+ EXPECT_EQ(24 * 1024, p.total_allocated_bytes());
+ // .. and allocated 28K of chunks (4, 8, 16)
+ EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes());
+
+ // we're passing on the first two chunks, containing 12K of data; we're left with
+ // one chunk of 16K containing 12K of data
+ p2.AcquireData(&p, true);
+ EXPECT_EQ(12 * 1024, p.total_allocated_bytes());
+ EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes());
+
+ // we allocate 8K, for which there isn't enough room in the current chunk,
+ // so another one is allocated (32K)
+ p.Allocate(8 * 1024);
+ EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes());
+
+ // we allocate 65K, which doesn't fit into the current chunk or the default
+ // size of the next allocated chunk (64K)
+ p.Allocate(65 * 1024);
+ EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes());
+ if (iter == 0) {
+ EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
+ } else {
+ EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+ }
+ EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+ // Clear() resets allocated data, but doesn't remove any chunks
+ p.Clear();
+ EXPECT_EQ(0, p.total_allocated_bytes());
+ if (iter == 0) {
+ EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
+ } else {
+ EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+ }
+ EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+ // next allocation reuses existing chunks
+ p.Allocate(1024);
+ EXPECT_EQ(1024, p.total_allocated_bytes());
+ if (iter == 0) {
+ EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
+ } else {
+ EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+ }
+ EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+ // ... unless it doesn't fit into any available chunk
+ p.Allocate(120 * 1024);
+ EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes());
+ if (iter == 0) {
+ EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes());
+ } else {
+ EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+ }
+ EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+ // ... Try another chunk that fits into an existing chunk
+ p.Allocate(33 * 1024);
+ EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes());
+ EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+ // we're releasing 3 chunks, which get added to p2
+ p2.AcquireData(&p, false);
+ EXPECT_EQ(0, p.total_allocated_bytes());
+ EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+ EXPECT_EQ(0, p.GetTotalChunkSizes());
+
+ p3.AcquireData(&p2, true); // we're keeping the 65k chunk
+ EXPECT_EQ(33 * 1024, p2.total_allocated_bytes());
+ EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes());
+
+ p.FreeAll();
+ p2.FreeAll();
+ p3.FreeAll();
+ }
+}
+
+// Test that we can keep an allocated chunk and a free chunk.
+// This case verifies that when chunks are acquired by another memory pool the
+// remaining chunks are consistent if there were more than one used chunk and some
+// free chunks.
+TEST(MemPoolTest, Keep) {
+ MemPool p;
+ p.Allocate(4*1024);
+ p.Allocate(8*1024);
+ p.Allocate(16*1024);
+ EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes());
+ EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
+ p.Clear();
+ EXPECT_EQ(0, p.total_allocated_bytes());
+ EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
+ p.Allocate(1*1024);
+ p.Allocate(4*1024);
+ EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes());
+ EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
+
+ MemPool p2;
+ p2.AcquireData(&p, true);
+ EXPECT_EQ(4 * 1024, p.total_allocated_bytes());
+ EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes());
+ EXPECT_EQ(1 * 1024, p2.total_allocated_bytes());
+ EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes());
+
+ p.FreeAll();
+ p2.FreeAll();
+}
+
+// Tests that we can return partial allocations.
+TEST(MemPoolTest, ReturnPartial) {
+ MemPool p;
+ uint8_t* ptr = p.Allocate(1024);
+ EXPECT_EQ(1024, p.total_allocated_bytes());
+ memset(ptr, 0, 1024);
+ p.ReturnPartialAllocation(1024);
+
+ uint8_t* ptr2 = p.Allocate(1024);
+ EXPECT_EQ(1024, p.total_allocated_bytes());
+ EXPECT_TRUE(ptr == ptr2);
+ p.ReturnPartialAllocation(1016);
+
+ ptr2 = p.Allocate(1016);
+ EXPECT_EQ(1024, p.total_allocated_bytes());
+ EXPECT_TRUE(ptr2 == ptr + 8);
+ p.ReturnPartialAllocation(512);
+ memset(ptr2, 1, 1016 - 512);
+
+ uint8_t* ptr3 = p.Allocate(512);
+ EXPECT_EQ(1024, p.total_allocated_bytes());
+ EXPECT_TRUE(ptr3 == ptr + 512);
+ memset(ptr3, 2, 512);
+
+ for (int i = 0; i < 8; ++i) {
+ EXPECT_EQ(0, ptr[i]);
+ }
+ for (int i = 8; i < 512; ++i) {
+ EXPECT_EQ(1, ptr[i]);
+ }
+ for (int i = 512; i < 1024; ++i) {
+ EXPECT_EQ(2, ptr[i]);
+ }
+
+ p.FreeAll();
+}
+
+// Test that the MemPool overhead is bounded when we make allocations of
+// INITIAL_CHUNK_SIZE.
+TEST(MemPoolTest, MemoryOverhead) {
+ MemPool p;
+ const int alloc_size = MemPoolTest::INITIAL_CHUNK_SIZE;
+ const int num_allocs = 1000;
+ int64_t total_allocated = 0;
+
+ for (int i = 0; i < num_allocs; ++i) {
+ uint8_t* mem = p.Allocate(alloc_size);
+ ASSERT_TRUE(mem != NULL);
+ total_allocated += alloc_size;
+
+ int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
+ // The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most
+ // one empty chunk at the end.
+ EXPECT_LE(wasted_memory, MemPoolTest::MAX_CHUNK_SIZE);
+ // The chunk doubling algorithm should not allocate chunks larger than the total
+ // amount of memory already allocated.
+ EXPECT_LE(wasted_memory, total_allocated);
+ }
+
+ p.FreeAll();
+}
+
+// Test that the MemPool overhead is bounded when we make alternating large and small
+// allocations.
+TEST(MemPoolTest, FragmentationOverhead) {
+ MemPool p;
+ const int num_allocs = 100;
+ int64_t total_allocated = 0;
+
+ for (int i = 0; i < num_allocs; ++i) {
+ int alloc_size = i % 2 == 0 ? 1 : MemPoolTest::MAX_CHUNK_SIZE;
+ uint8_t* mem = p.Allocate(alloc_size);
+ ASSERT_TRUE(mem != NULL);
+ total_allocated += alloc_size;
+
+ int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
+ // Fragmentation should not waste more than half of each completed chunk.
+ EXPECT_LE(wasted_memory, total_allocated + MemPoolTest::MAX_CHUNK_SIZE);
+ }
+
+ p.FreeAll();
+}
+
+} // namespace parquet_cpp