You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2017/07/31 15:15:00 UTC
[3/5] parquet-cpp git commit: PARQUET-1068: Modify .clang-format to
use straight Google format with 90-character line width
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/column_page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_page.h b/src/parquet/column_page.h
index 7840612..85e3bb5 100644
--- a/src/parquet/column_page.h
+++ b/src/parquet/column_page.h
@@ -62,9 +62,9 @@ class Page {
class DataPage : public Page {
public:
DataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
- Encoding::type encoding, Encoding::type definition_level_encoding,
- Encoding::type repetition_level_encoding,
- const EncodedStatistics& statistics = EncodedStatistics())
+ Encoding::type encoding, Encoding::type definition_level_encoding,
+ Encoding::type repetition_level_encoding,
+ const EncodedStatistics& statistics = EncodedStatistics())
: Page(buffer, PageType::DATA_PAGE),
num_values_(num_values),
encoding_(encoding),
@@ -93,11 +93,11 @@ class DataPage : public Page {
class CompressedDataPage : public DataPage {
public:
CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
- Encoding::type encoding, Encoding::type definition_level_encoding,
- Encoding::type repetition_level_encoding, int64_t uncompressed_size,
- const EncodedStatistics& statistics = EncodedStatistics())
+ Encoding::type encoding, Encoding::type definition_level_encoding,
+ Encoding::type repetition_level_encoding, int64_t uncompressed_size,
+ const EncodedStatistics& statistics = EncodedStatistics())
: DataPage(buffer, num_values, encoding, definition_level_encoding,
- repetition_level_encoding, statistics),
+ repetition_level_encoding, statistics),
uncompressed_size_(uncompressed_size) {}
int64_t uncompressed_size() const { return uncompressed_size_; }
@@ -109,8 +109,9 @@ class CompressedDataPage : public DataPage {
class DataPageV2 : public Page {
public:
DataPageV2(const std::shared_ptr<Buffer>& buffer, int32_t num_values, int32_t num_nulls,
- int32_t num_rows, Encoding::type encoding, int32_t definition_levels_byte_length,
- int32_t repetition_levels_byte_length, bool is_compressed = false)
+ int32_t num_rows, Encoding::type encoding,
+ int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length,
+ bool is_compressed = false)
: Page(buffer, PageType::DATA_PAGE_V2),
num_values_(num_values),
num_nulls_(num_nulls),
@@ -149,7 +150,7 @@ class DataPageV2 : public Page {
class DictionaryPage : public Page {
public:
DictionaryPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
- Encoding::type encoding, bool is_sorted = false)
+ Encoding::type encoding, bool is_sorted = false)
: Page(buffer, PageType::DICTIONARY_PAGE),
num_values_(num_values),
encoding_(encoding),
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/column_reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader-test.cc b/src/parquet/column_reader-test.cc
index 84d1e37..2599325 100644
--- a/src/parquet/column_reader-test.cc
+++ b/src/parquet/column_reader-test.cc
@@ -44,8 +44,10 @@ namespace test {
template <typename T>
static inline bool vector_equal_with_def_levels(const vector<T>& left,
- const vector<int16_t>& def_levels, int16_t max_def_levels, int16_t max_rep_levels,
- const vector<T>& right) {
+ const vector<int16_t>& def_levels,
+ int16_t max_def_levels,
+ int16_t max_rep_levels,
+ const vector<T>& right) {
size_t i_left = 0;
size_t i_right = 0;
for (size_t i = 0; i < def_levels.size(); i++) {
@@ -63,7 +65,9 @@ static inline bool vector_equal_with_def_levels(const vector<T>& left,
i_right++;
} else if (def_levels[i] < (max_def_levels - 1)) {
// Null entry on a higher nesting level, only supported for non-repeating data
- if (max_rep_levels == 0) { i_right++; }
+ if (max_rep_levels == 0) {
+ i_right++;
+ }
}
}
@@ -93,8 +97,9 @@ class TestPrimitiveReader : public ::testing::Test {
// 1) batch_size < page_size (multiple ReadBatch from a single page)
// 2) batch_size > page_size (BatchRead limits to a single page)
do {
- batch = static_cast<int>(reader->ReadBatch(batch_size, &dresult[0] + batch_actual,
- &rresult[0] + batch_actual, &vresult[0] + total_values_read, &values_read));
+ batch = static_cast<int>(reader->ReadBatch(
+ batch_size, &dresult[0] + batch_actual, &rresult[0] + batch_actual,
+ &vresult[0] + total_values_read, &values_read));
total_values_read += static_cast<int>(values_read);
batch_actual += batch;
batch_size = std::max(batch_size * 2, 4096);
@@ -103,8 +108,12 @@ class TestPrimitiveReader : public ::testing::Test {
ASSERT_EQ(num_levels_, batch_actual);
ASSERT_EQ(num_values_, total_values_read);
ASSERT_TRUE(vector_equal(values_, vresult));
- if (max_def_level_ > 0) { ASSERT_TRUE(vector_equal(def_levels_, dresult)); }
- if (max_rep_level_ > 0) { ASSERT_TRUE(vector_equal(rep_levels_, rresult)); }
+ if (max_def_level_ > 0) {
+ ASSERT_TRUE(vector_equal(def_levels_, dresult));
+ }
+ if (max_rep_level_ > 0) {
+ ASSERT_TRUE(vector_equal(rep_levels_, rresult));
+ }
// catch improper writes at EOS
batch_actual =
static_cast<int>(reader->ReadBatch(5, nullptr, nullptr, nullptr, &values_read));
@@ -131,8 +140,8 @@ class TestPrimitiveReader : public ::testing::Test {
// 1) batch_size < page_size (multiple ReadBatch from a single page)
// 2) batch_size > page_size (BatchRead limits to a single page)
do {
- batch = static_cast<int>(reader->ReadBatchSpaced(batch_size,
- dresult.data() + levels_actual, rresult.data() + levels_actual,
+ batch = static_cast<int>(reader->ReadBatchSpaced(
+ batch_size, dresult.data() + levels_actual, rresult.data() + levels_actual,
vresult.data() + batch_actual, valid_bits.data() + batch_actual, 0,
&levels_read, &values_read, &null_count));
total_values_read += batch - static_cast<int>(null_count);
@@ -145,15 +154,18 @@ class TestPrimitiveReader : public ::testing::Test {
ASSERT_EQ(num_values_, total_values_read);
if (max_def_level_ > 0) {
ASSERT_TRUE(vector_equal(def_levels_, dresult));
- ASSERT_TRUE(vector_equal_with_def_levels(
- values_, dresult, max_def_level_, max_rep_level_, vresult));
+ ASSERT_TRUE(vector_equal_with_def_levels(values_, dresult, max_def_level_,
+ max_rep_level_, vresult));
} else {
ASSERT_TRUE(vector_equal(values_, vresult));
}
- if (max_rep_level_ > 0) { ASSERT_TRUE(vector_equal(rep_levels_, rresult)); }
+ if (max_rep_level_ > 0) {
+ ASSERT_TRUE(vector_equal(rep_levels_, rresult));
+ }
// catch improper writes at EOS
- batch_actual = static_cast<int>(reader->ReadBatchSpaced(5, nullptr, nullptr, nullptr,
- valid_bits.data(), 0, &levels_read, &values_read, &null_count));
+ batch_actual = static_cast<int>(
+ reader->ReadBatchSpaced(5, nullptr, nullptr, nullptr, valid_bits.data(), 0,
+ &levels_read, &values_read, &null_count));
ASSERT_EQ(0, batch_actual);
ASSERT_EQ(0, null_count);
}
@@ -167,15 +179,17 @@ class TestPrimitiveReader : public ::testing::Test {
}
void ExecutePlain(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
- num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
- rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN);
+ num_values_ =
+ MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
+ values_, data_buffer_, pages_, Encoding::PLAIN);
num_levels_ = num_pages * levels_per_page;
InitReader(d);
CheckResults();
Clear();
- num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
- rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN);
+ num_values_ =
+ MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
+ values_, data_buffer_, pages_, Encoding::PLAIN);
num_levels_ = num_pages * levels_per_page;
InitReader(d);
CheckResultsSpaced();
@@ -183,15 +197,17 @@ class TestPrimitiveReader : public ::testing::Test {
}
void ExecuteDict(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
- num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
- rep_levels_, values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
+ num_values_ =
+ MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
+ values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
num_levels_ = num_pages * levels_per_page;
InitReader(d);
CheckResults();
Clear();
- num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
- rep_levels_, values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
+ num_values_ =
+ MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
+ values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
num_levels_ = num_pages * levels_per_page;
InitReader(d);
CheckResultsSpaced();
@@ -252,7 +268,7 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) {
NodePtr type = schema::Int32("b", Repetition::REQUIRED);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
MakePages<Int32Type>(&descr, num_pages, levels_per_page, def_levels_, rep_levels_,
- values_, data_buffer_, pages_, Encoding::PLAIN);
+ values_, data_buffer_, pages_, Encoding::PLAIN);
InitReader(&descr);
vector<int32_t> vresult(levels_per_page / 2, -1);
vector<int16_t> dresult(levels_per_page / 2, -1);
@@ -266,9 +282,10 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) {
int64_t levels_skipped = reader->Skip(2 * levels_per_page);
ASSERT_EQ(2 * levels_per_page, levels_skipped);
// Read half a page
- reader->ReadBatch(
- levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
- vector<int32_t> sub_values(values_.begin() + 2 * levels_per_page,
+ reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(),
+ &values_read);
+ vector<int32_t> sub_values(
+ values_.begin() + 2 * levels_per_page,
values_.begin() + static_cast<int>(2.5 * static_cast<double>(levels_per_page)));
ASSERT_TRUE(vector_equal(sub_values, vresult));
@@ -276,10 +293,11 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) {
levels_skipped = reader->Skip(levels_per_page);
ASSERT_EQ(levels_per_page, levels_skipped);
// Read half a page
- reader->ReadBatch(
- levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
+ reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(),
+ &values_read);
sub_values.clear();
- sub_values.insert(sub_values.end(),
+ sub_values.insert(
+ sub_values.end(),
values_.begin() + static_cast<int>(3.5 * static_cast<double>(levels_per_page)),
values_.begin() + 4 * levels_per_page);
ASSERT_TRUE(vector_equal(sub_values, vresult));
@@ -289,10 +307,11 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) {
levels_skipped = reader->Skip(levels_per_page / 2);
ASSERT_EQ(0.5 * levels_per_page, levels_skipped);
// Read half a page
- reader->ReadBatch(
- levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
+ reader->ReadBatch(levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(),
+ &values_read);
sub_values.clear();
- sub_values.insert(sub_values.end(),
+ sub_values.insert(
+ sub_values.end(),
values_.begin() + static_cast<int>(4.5 * static_cast<double>(levels_per_page)),
values_.end());
ASSERT_TRUE(vector_equal(sub_values, vresult));
@@ -323,8 +342,8 @@ TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
pages_.clear();
dict_page = std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN_DICTIONARY);
- data_page = MakeDataPage<Int32Type>(
- &descr, {}, 0, Encoding::PLAIN_DICTIONARY, {}, 0, {}, 0, {}, 0);
+ data_page = MakeDataPage<Int32Type>(&descr, {}, 0, Encoding::PLAIN_DICTIONARY, {}, 0,
+ {}, 0, {}, 0);
pages_.push_back(dict_page);
pages_.push_back(data_page);
InitReader(&descr);
@@ -332,8 +351,8 @@ TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
ASSERT_NO_THROW(reader_->HasNext());
pages_.clear();
- data_page = MakeDataPage<Int32Type>(
- &descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
+ data_page = MakeDataPage<Int32Type>(&descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {},
+ 0, {}, 0);
pages_.push_back(data_page);
InitReader(&descr);
// Tests dictionary page must occur before data page
@@ -358,8 +377,8 @@ TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
ASSERT_THROW(reader_->HasNext(), ParquetException);
pages_.clear();
- data_page = MakeDataPage<Int32Type>(
- &descr, {}, 0, Encoding::DELTA_BYTE_ARRAY, {}, 0, {}, 0, {}, 0);
+ data_page = MakeDataPage<Int32Type>(&descr, {}, 0, Encoding::DELTA_BYTE_ARRAY, {}, 0,
+ {}, 0, {}, 0);
pages_.push_back(data_page);
InitReader(&descr);
// unsupported encoding
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/column_reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
index ce6936d..5f6259f 100644
--- a/src/parquet/column_reader.cc
+++ b/src/parquet/column_reader.cc
@@ -36,7 +36,7 @@ LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
LevelDecoder::~LevelDecoder() {}
int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
- int num_buffered_values, const uint8_t* data) {
+ int num_buffered_values, const uint8_t* data) {
int32_t num_bytes = 0;
encoding_ = encoding;
num_values_remaining_ = num_buffered_values;
@@ -86,8 +86,8 @@ ReaderProperties default_reader_properties() {
return default_reader_properties;
}
-ColumnReader::ColumnReader(
- const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, MemoryPool* pool)
+ColumnReader::ColumnReader(const ColumnDescriptor* descr,
+ std::unique_ptr<PageReader> pager, MemoryPool* pool)
: descr_(descr),
pager_(std::move(pager)),
num_buffered_values_(0),
@@ -193,7 +193,9 @@ bool TypedColumnReader<DType>::ReadNewPage() {
// first page with this encoding.
Encoding::type encoding = page->encoding();
- if (IsDictionaryIndexEncoding(encoding)) { encoding = Encoding::RLE_DICTIONARY; }
+ if (IsDictionaryIndexEncoding(encoding)) {
+ encoding = Encoding::RLE_DICTIONARY;
+ }
auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
@@ -221,8 +223,8 @@ bool TypedColumnReader<DType>::ReadNewPage() {
throw ParquetException("Unknown encoding type.");
}
}
- current_decoder_->SetData(
- static_cast<int>(num_buffered_values_), buffer, static_cast<int>(data_size));
+ current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
+ static_cast<int>(data_size));
return true;
} else {
// We don't know what this page type is. We're allowed to skip non-data
@@ -237,20 +239,25 @@ bool TypedColumnReader<DType>::ReadNewPage() {
// Batch read APIs
int64_t ColumnReader::ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
- if (descr_->max_definition_level() == 0) { return 0; }
+ if (descr_->max_definition_level() == 0) {
+ return 0;
+ }
return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
}
int64_t ColumnReader::ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
- if (descr_->max_repetition_level() == 0) { return 0; }
+ if (descr_->max_repetition_level() == 0) {
+ return 0;
+ }
return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
}
// ----------------------------------------------------------------------
// Dynamic column reader constructor
-std::shared_ptr<ColumnReader> ColumnReader::Make(
- const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, MemoryPool* pool) {
+std::shared_ptr<ColumnReader> ColumnReader::Make(const ColumnDescriptor* descr,
+ std::unique_ptr<PageReader> pager,
+ MemoryPool* pool) {
switch (descr->physical_type()) {
case Type::BOOLEAN:
return std::make_shared<BoolReader>(descr, std::move(pager), pool);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/column_reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
index e733d67..df7deb8 100644
--- a/src/parquet/column_reader.h
+++ b/src/parquet/column_reader.h
@@ -53,7 +53,7 @@ class PARQUET_EXPORT LevelDecoder {
// Initialize the LevelDecoder state with new data
// and return the number of bytes consumed
int SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values,
- const uint8_t* data);
+ const uint8_t* data);
// Decodes a batch of levels into an array and returns the number of levels decoded
int Decode(int batch_size, int16_t* levels);
@@ -69,11 +69,11 @@ class PARQUET_EXPORT LevelDecoder {
class PARQUET_EXPORT ColumnReader {
public:
ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
virtual ~ColumnReader();
- static std::shared_ptr<ColumnReader> Make(const ColumnDescriptor* descr,
- std::unique_ptr<PageReader> pager,
+ static std::shared_ptr<ColumnReader> Make(
+ const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
// Returns true if there are still values in this column.
@@ -81,7 +81,9 @@ class PARQUET_EXPORT ColumnReader {
// Either there is no data page available yet, or the data page has been
// exhausted
if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
- if (!ReadNewPage() || num_buffered_values_ == 0) { return false; }
+ if (!ReadNewPage() || num_buffered_values_ == 0) {
+ return false;
+ }
}
return true;
}
@@ -135,7 +137,7 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
typedef typename DType::c_type T;
TypedColumnReader(const ColumnDescriptor* schema, std::unique_ptr<PageReader> pager,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
: ColumnReader(schema, std::move(pager), pool), current_decoder_(NULL) {}
virtual ~TypedColumnReader() {}
@@ -157,7 +159,7 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
//
// @returns: actual number of levels read (see values_read for number of values read)
int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
- T* values, int64_t* values_read);
+ T* values, int64_t* values_read);
/// Read a batch of repetition levels, definition levels, and values from the
/// column and leave spaces for null entries on the lowest level in the values
@@ -194,8 +196,9 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
/// @param[out] null_count The number of nulls on the lowest levels.
/// (i.e. (values_read - null_count) is total number of non-null entries)
int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
- T* values, uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
- int64_t* values_read, int64_t* null_count);
+ T* values, uint8_t* valid_bits, int64_t valid_bits_offset,
+ int64_t* levels_read, int64_t* values_read,
+ int64_t* null_count);
// Skip reading levels
// Returns the number of levels skipped
@@ -219,7 +222,7 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
//
// @returns: the number of values read into the out buffer
int64_t ReadValuesSpaced(int64_t batch_size, T* out, int null_count,
- uint8_t* valid_bits, int64_t valid_bits_offset);
+ uint8_t* valid_bits, int64_t valid_bits_offset);
// Map of encoding type to the respective decoder object. For example, a
// column chunk's data pages may include both dictionary-encoded and
@@ -239,14 +242,18 @@ inline int64_t TypedColumnReader<DType>::ReadValues(int64_t batch_size, T* out)
template <typename DType>
inline int64_t TypedColumnReader<DType>::ReadValuesSpaced(int64_t batch_size, T* out,
- int null_count, uint8_t* valid_bits, int64_t valid_bits_offset) {
- return current_decoder_->DecodeSpaced(
- out, static_cast<int>(batch_size), null_count, valid_bits, valid_bits_offset);
+ int null_count,
+ uint8_t* valid_bits,
+ int64_t valid_bits_offset) {
+ return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size), null_count,
+ valid_bits, valid_bits_offset);
}
template <typename DType>
inline int64_t TypedColumnReader<DType>::ReadBatch(int64_t batch_size,
- int16_t* def_levels, int16_t* rep_levels, T* values, int64_t* values_read) {
+ int16_t* def_levels,
+ int16_t* rep_levels, T* values,
+ int64_t* values_read) {
// HasNext invokes ReadNewPage
if (!HasNext()) {
*values_read = 0;
@@ -268,7 +275,9 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int64_t batch_size,
// TODO(wesm): this tallying of values-to-decode can be performed with better
// cache-efficiency if fused with the level decoding.
for (int64_t i = 0; i < num_def_levels; ++i) {
- if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; }
+ if (def_levels[i] == descr_->max_definition_level()) {
+ ++values_to_read;
+ }
}
} else {
// Required field, read all values
@@ -291,8 +300,10 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int64_t batch_size,
}
inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
- int16_t max_definition_level, int16_t max_repetition_level, int64_t* values_read,
- int64_t* null_count, uint8_t* valid_bits, int64_t valid_bits_offset) {
+ int16_t max_definition_level,
+ int16_t max_repetition_level, int64_t* values_read,
+ int64_t* null_count, uint8_t* valid_bits,
+ int64_t valid_bits_offset) {
int byte_offset = static_cast<int>(valid_bits_offset) / 8;
int bit_offset = static_cast<int>(valid_bits_offset) % 8;
uint8_t bitset = valid_bits[byte_offset];
@@ -330,15 +341,17 @@ inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_
bitset = valid_bits[byte_offset];
}
}
- if (bit_offset != 0) { valid_bits[byte_offset] = bitset; }
+ if (bit_offset != 0) {
+ valid_bits[byte_offset] = bitset;
+ }
*values_read = (bit_offset + byte_offset * 8 - valid_bits_offset);
}
template <typename DType>
-inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int64_t batch_size,
- int16_t* def_levels, int16_t* rep_levels, T* values, uint8_t* valid_bits,
- int64_t valid_bits_offset, int64_t* levels_read, int64_t* values_read,
- int64_t* null_count_out) {
+inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(
+ int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
+ uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
+ int64_t* values_read, int64_t* null_count_out) {
// HasNext invokes ReadNewPage
if (!HasNext()) {
*levels_read = 0;
@@ -388,7 +401,9 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int64_t batch_size,
if (!has_spaced_values) {
int values_to_read = 0;
for (int64_t i = 0; i < num_def_levels; ++i) {
- if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; }
+ if (def_levels[i] == descr_->max_definition_level()) {
+ ++values_to_read;
+ }
}
total_values = ReadValues(values_to_read, values);
for (int64_t i = 0; i < total_values; i++) {
@@ -399,9 +414,10 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int64_t batch_size,
int16_t max_definition_level = descr_->max_definition_level();
int16_t max_repetition_level = descr_->max_repetition_level();
DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
- max_repetition_level, values_read, &null_count, valid_bits, valid_bits_offset);
+ max_repetition_level, values_read, &null_count, valid_bits,
+ valid_bits_offset);
total_values = ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
- valid_bits, valid_bits_offset);
+ valid_bits, valid_bits_offset);
}
*levels_read = num_def_levels;
*null_count_out = null_count;
@@ -446,9 +462,9 @@ inline int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
do {
batch_size = std::min(batch_size, rows_to_skip);
values_read = ReadBatch(static_cast<int>(batch_size),
- reinterpret_cast<int16_t*>(def_levels->mutable_data()),
- reinterpret_cast<int16_t*>(rep_levels->mutable_data()),
- reinterpret_cast<T*>(vals->mutable_data()), &values_read);
+ reinterpret_cast<int16_t*>(def_levels->mutable_data()),
+ reinterpret_cast<int16_t*>(rep_levels->mutable_data()),
+ reinterpret_cast<T*>(vals->mutable_data()), &values_read);
rows_to_skip -= values_read;
} while (values_read > 0 && rows_to_skip > 0);
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/column_scanner-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_scanner-test.cc b/src/parquet/column_scanner-test.cc
index 086722b..0cebdc0 100644
--- a/src/parquet/column_scanner-test.cc
+++ b/src/parquet/column_scanner-test.cc
@@ -43,8 +43,8 @@ using schema::NodePtr;
namespace test {
template <>
-void InitDictValues<bool>(
- int num_values, int dict_per_page, vector<bool>& values, vector<uint8_t>& buffer) {
+void InitDictValues<bool>(int num_values, int dict_per_page, vector<bool>& values,
+ vector<uint8_t>& buffer) {
// No op for bool
}
@@ -91,9 +91,9 @@ class TestFlatScanner : public ::testing::Test {
}
void Execute(int num_pages, int levels_per_page, int batch_size,
- const ColumnDescriptor* d, Encoding::type encoding) {
+ const ColumnDescriptor* d, Encoding::type encoding) {
num_values_ = MakePages<Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
- values_, data_buffer_, pages_, encoding);
+ values_, data_buffer_, pages_, encoding);
num_levels_ = num_pages * levels_per_page;
InitScanner(d);
CheckResults(batch_size, d);
@@ -101,22 +101,22 @@ class TestFlatScanner : public ::testing::Test {
}
void InitDescriptors(std::shared_ptr<ColumnDescriptor>& d1,
- std::shared_ptr<ColumnDescriptor>& d2, std::shared_ptr<ColumnDescriptor>& d3,
- int length) {
+ std::shared_ptr<ColumnDescriptor>& d2,
+ std::shared_ptr<ColumnDescriptor>& d3, int length) {
NodePtr type;
- type = schema::PrimitiveNode::Make(
- "c1", Repetition::REQUIRED, Type::type_num, LogicalType::NONE, length);
+ type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::type_num,
+ LogicalType::NONE, length);
d1.reset(new ColumnDescriptor(type, 0, 0));
- type = schema::PrimitiveNode::Make(
- "c2", Repetition::OPTIONAL, Type::type_num, LogicalType::NONE, length);
+ type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL, Type::type_num,
+ LogicalType::NONE, length);
d2.reset(new ColumnDescriptor(type, 4, 0));
- type = schema::PrimitiveNode::Make(
- "c3", Repetition::REPEATED, Type::type_num, LogicalType::NONE, length);
+ type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED, Type::type_num,
+ LogicalType::NONE, length);
d3.reset(new ColumnDescriptor(type, 4, 2));
}
void ExecuteAll(int num_pages, int num_levels, int batch_size, int type_length,
- Encoding::type encoding = Encoding::PLAIN) {
+ Encoding::type encoding = Encoding::PLAIN) {
std::shared_ptr<ColumnDescriptor> d1;
std::shared_ptr<ColumnDescriptor> d2;
std::shared_ptr<ColumnDescriptor> d3;
@@ -145,7 +145,7 @@ static int num_pages = 20;
static int batch_size = 32;
typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
- ByteArrayType>
+ ByteArrayType>
TestTypes;
using TestBooleanFlatScanner = TestFlatScanner<BooleanType>;
@@ -158,8 +158,8 @@ TYPED_TEST(TestFlatScanner, TestPlainScanner) {
}
TYPED_TEST(TestFlatScanner, TestDictScanner) {
- this->ExecuteAll(
- num_pages, num_levels_per_page, batch_size, 0, Encoding::RLE_DICTIONARY);
+ this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0,
+ Encoding::RLE_DICTIONARY);
}
TEST_F(TestBooleanFlatScanner, TestPlainScanner) {
@@ -171,33 +171,35 @@ TEST_F(TestFLBAFlatScanner, TestPlainScanner) {
}
TEST_F(TestFLBAFlatScanner, TestDictScanner) {
- this->ExecuteAll(
- num_pages, num_levels_per_page, batch_size, FLBA_LENGTH, Encoding::RLE_DICTIONARY);
+ this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH,
+ Encoding::RLE_DICTIONARY);
}
TEST_F(TestFLBAFlatScanner, TestPlainDictScanner) {
this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH,
- Encoding::PLAIN_DICTIONARY);
+ Encoding::PLAIN_DICTIONARY);
}
// PARQUET 502
TEST_F(TestFLBAFlatScanner, TestSmallBatch) {
- NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED,
- Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
+ NodePtr type =
+ schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY,
+ LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
const ColumnDescriptor d(type, 0, 0);
- num_values_ = MakePages<FLBAType>(
- &d, 1, 100, def_levels_, rep_levels_, values_, data_buffer_, pages_);
+ num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_,
+ data_buffer_, pages_);
num_levels_ = 1 * 100;
InitScanner(&d);
CheckResults(1, &d);
}
TEST_F(TestFLBAFlatScanner, TestDescriptorAPI) {
- NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
- Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
+ NodePtr type =
+ schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY,
+ LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
const ColumnDescriptor d(type, 4, 0);
- num_values_ = MakePages<FLBAType>(
- &d, 1, 100, def_levels_, rep_levels_, values_, data_buffer_, pages_);
+ num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_,
+ data_buffer_, pages_);
num_levels_ = 1 * 100;
InitScanner(&d);
TypedScanner<FLBAType>* scanner =
@@ -208,11 +210,12 @@ TEST_F(TestFLBAFlatScanner, TestDescriptorAPI) {
}
TEST_F(TestFLBAFlatScanner, TestFLBAPrinterNext) {
- NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
- Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
+ NodePtr type =
+ schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY,
+ LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
const ColumnDescriptor d(type, 4, 0);
- num_values_ = MakePages<FLBAType>(
- &d, 1, 100, def_levels_, rep_levels_, values_, data_buffer_, pages_);
+ num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_,
+ data_buffer_, pages_);
num_levels_ = 1 * 100;
InitScanner(&d);
TypedScanner<FLBAType>* scanner =
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/column_scanner.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_scanner.cc b/src/parquet/column_scanner.cc
index a67af71..51c8773 100644
--- a/src/parquet/column_scanner.cc
+++ b/src/parquet/column_scanner.cc
@@ -27,8 +27,8 @@ using arrow::MemoryPool;
namespace parquet {
-std::shared_ptr<Scanner> Scanner::Make(
- std::shared_ptr<ColumnReader> col_reader, int64_t batch_size, MemoryPool* pool) {
+std::shared_ptr<Scanner> Scanner::Make(std::shared_ptr<ColumnReader> col_reader,
+ int64_t batch_size, MemoryPool* pool) {
switch (col_reader->type()) {
case Type::BOOLEAN:
return std::make_shared<BoolScanner>(col_reader, batch_size, pool);
@@ -54,32 +54,33 @@ std::shared_ptr<Scanner> Scanner::Make(
}
int64_t ScanAllValues(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
- uint8_t* values, int64_t* values_buffered, parquet::ColumnReader* reader) {
+ uint8_t* values, int64_t* values_buffered,
+ parquet::ColumnReader* reader) {
switch (reader->type()) {
case parquet::Type::BOOLEAN:
- return ScanAll<parquet::BoolReader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ return ScanAll<parquet::BoolReader>(batch_size, def_levels, rep_levels, values,
+ values_buffered, reader);
case parquet::Type::INT32:
- return ScanAll<parquet::Int32Reader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ return ScanAll<parquet::Int32Reader>(batch_size, def_levels, rep_levels, values,
+ values_buffered, reader);
case parquet::Type::INT64:
- return ScanAll<parquet::Int64Reader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ return ScanAll<parquet::Int64Reader>(batch_size, def_levels, rep_levels, values,
+ values_buffered, reader);
case parquet::Type::INT96:
- return ScanAll<parquet::Int96Reader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ return ScanAll<parquet::Int96Reader>(batch_size, def_levels, rep_levels, values,
+ values_buffered, reader);
case parquet::Type::FLOAT:
- return ScanAll<parquet::FloatReader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ return ScanAll<parquet::FloatReader>(batch_size, def_levels, rep_levels, values,
+ values_buffered, reader);
case parquet::Type::DOUBLE:
- return ScanAll<parquet::DoubleReader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ return ScanAll<parquet::DoubleReader>(batch_size, def_levels, rep_levels, values,
+ values_buffered, reader);
case parquet::Type::BYTE_ARRAY:
- return ScanAll<parquet::ByteArrayReader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ return ScanAll<parquet::ByteArrayReader>(batch_size, def_levels, rep_levels, values,
+ values_buffered, reader);
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
- return ScanAll<parquet::FixedLenByteArrayReader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ return ScanAll<parquet::FixedLenByteArrayReader>(batch_size, def_levels, rep_levels,
+ values, values_buffered, reader);
default:
parquet::ParquetException::NYI("type reader not implemented");
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/column_scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_scanner.h b/src/parquet/column_scanner.h
index 4be0b7f..2917201 100644
--- a/src/parquet/column_scanner.h
+++ b/src/parquet/column_scanner.h
@@ -18,10 +18,10 @@
#ifndef PARQUET_COLUMN_SCANNER_H
#define PARQUET_COLUMN_SCANNER_H
+#include <stdio.h>
#include <cstdint>
#include <memory>
#include <ostream>
-#include <stdio.h>
#include <string>
#include <vector>
@@ -39,8 +39,8 @@ static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
class PARQUET_EXPORT Scanner {
public:
explicit Scanner(std::shared_ptr<ColumnReader> reader,
- int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+ int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
: batch_size_(batch_size),
level_offset_(0),
levels_buffered_(0),
@@ -54,7 +54,8 @@ class PARQUET_EXPORT Scanner {
virtual ~Scanner() {}
- static std::shared_ptr<Scanner> Make(std::shared_ptr<ColumnReader> col_reader,
+ static std::shared_ptr<Scanner> Make(
+ std::shared_ptr<ColumnReader> col_reader,
int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
@@ -90,8 +91,8 @@ class PARQUET_EXPORT TypedScanner : public Scanner {
typedef typename DType::c_type T;
explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
- int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+ int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
: Scanner(reader, batch_size, pool) {
typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get());
int value_byte_size = type_traits<DType::type_num>::value_byte_size;
@@ -103,13 +104,15 @@ class PARQUET_EXPORT TypedScanner : public Scanner {
bool NextLevels(int16_t* def_level, int16_t* rep_level) {
if (level_offset_ == levels_buffered_) {
- levels_buffered_ =
- static_cast<int>(typed_reader_->ReadBatch(static_cast<int>(batch_size_),
- def_levels_.data(), rep_levels_.data(), values_, &values_buffered_));
+ levels_buffered_ = static_cast<int>(
+ typed_reader_->ReadBatch(static_cast<int>(batch_size_), def_levels_.data(),
+ rep_levels_.data(), values_, &values_buffered_));
value_offset_ = 0;
level_offset_ = 0;
- if (!levels_buffered_) { return false; }
+ if (!levels_buffered_) {
+ return false;
+ }
}
*def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0;
*rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0;
@@ -128,7 +131,9 @@ class PARQUET_EXPORT TypedScanner : public Scanner {
NextLevels(def_level, rep_level);
*is_null = *def_level < descr()->max_definition_level();
- if (*is_null) { return true; }
+ if (*is_null) {
+ return true;
+ }
if (value_offset_ == values_buffered_) {
throw ParquetException("Value was non-null, but has not been buffered");
@@ -152,7 +157,9 @@ class PARQUET_EXPORT TypedScanner : public Scanner {
NextLevels(&def_level, &rep_level);
*is_null = def_level < descr()->max_definition_level();
- if (*is_null) { return true; }
+ if (*is_null) {
+ return true;
+ }
if (value_offset_ == values_buffered_) {
throw ParquetException("Value was non-null, but has not been buffered");
@@ -166,7 +173,9 @@ class PARQUET_EXPORT TypedScanner : public Scanner {
bool is_null = false;
char buffer[25];
- if (!NextValue(&val, &is_null)) { throw ParquetException("No more values buffered"); }
+ if (!NextValue(&val, &is_null)) {
+ throw ParquetException("No more values buffered");
+ }
if (is_null) {
std::string null_fmt = format_fwf<ByteArrayType>(width);
@@ -187,31 +196,31 @@ class PARQUET_EXPORT TypedScanner : public Scanner {
};
template <typename DType>
-inline void TypedScanner<DType>::FormatValue(
- void* val, char* buffer, int bufsize, int width) {
+inline void TypedScanner<DType>::FormatValue(void* val, char* buffer, int bufsize,
+ int width) {
std::string fmt = format_fwf<DType>(width);
snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val));
}
template <>
-inline void TypedScanner<Int96Type>::FormatValue(
- void* val, char* buffer, int bufsize, int width) {
+inline void TypedScanner<Int96Type>::FormatValue(void* val, char* buffer, int bufsize,
+ int width) {
std::string fmt = format_fwf<Int96Type>(width);
std::string result = Int96ToString(*reinterpret_cast<Int96*>(val));
snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
}
template <>
-inline void TypedScanner<ByteArrayType>::FormatValue(
- void* val, char* buffer, int bufsize, int width) {
+inline void TypedScanner<ByteArrayType>::FormatValue(void* val, char* buffer, int bufsize,
+ int width) {
std::string fmt = format_fwf<ByteArrayType>(width);
std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val));
snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
}
template <>
-inline void TypedScanner<FLBAType>::FormatValue(
- void* val, char* buffer, int bufsize, int width) {
+inline void TypedScanner<FLBAType>::FormatValue(void* val, char* buffer, int bufsize,
+ int width) {
std::string fmt = format_fwf<FLBAType>(width);
std::string result = FixedLenByteArrayToString(
*reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length());
@@ -229,17 +238,19 @@ typedef TypedScanner<FLBAType> FixedLenByteArrayScanner;
template <typename RType>
int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
- uint8_t* values, int64_t* values_buffered, parquet::ColumnReader* reader) {
+ uint8_t* values, int64_t* values_buffered,
+ parquet::ColumnReader* reader) {
typedef typename RType::T Type;
auto typed_reader = static_cast<RType*>(reader);
auto vals = reinterpret_cast<Type*>(&values[0]);
- return typed_reader->ReadBatch(
- batch_size, def_levels, rep_levels, vals, values_buffered);
+ return typed_reader->ReadBatch(batch_size, def_levels, rep_levels, vals,
+ values_buffered);
}
int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels,
- int16_t* rep_levels, uint8_t* values, int64_t* values_buffered,
- parquet::ColumnReader* reader);
+ int16_t* rep_levels, uint8_t* values,
+ int64_t* values_buffered,
+ parquet::ColumnReader* reader);
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/column_writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 798c7ba..3ec3663 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -59,8 +59,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
Type::type type_num() { return TestType::type_num; }
- void BuildReader(
- int64_t num_rows, Compression::type compression = Compression::UNCOMPRESSED) {
+ void BuildReader(int64_t num_rows,
+ Compression::type compression = Compression::UNCOMPRESSED) {
auto buffer = sink_->GetBuffer();
std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
std::unique_ptr<SerializedPageReader> page_reader(
@@ -93,8 +93,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
void ReadColumn(Compression::type compression = Compression::UNCOMPRESSED) {
BuildReader(static_cast<int64_t>(this->values_out_.size()), compression);
reader_->ReadBatch(static_cast<int>(this->values_out_.size()),
- definition_levels_out_.data(), repetition_levels_out_.data(),
- this->values_out_ptr_, &values_read_);
+ definition_levels_out_.data(), repetition_levels_out_.data(),
+ this->values_out_ptr_, &values_read_);
this->SyncValuesOut();
}
@@ -105,22 +105,24 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
}
void TestRequiredWithSettings(Encoding::type encoding, Compression::type compression,
- bool enable_dictionary, bool enable_statistics, int64_t num_rows = SMALL_SIZE) {
+ bool enable_dictionary, bool enable_statistics,
+ int64_t num_rows = SMALL_SIZE) {
this->GenerateData(num_rows);
- this->WriteRequiredWithSettings(
- encoding, compression, enable_dictionary, enable_statistics, num_rows);
+ this->WriteRequiredWithSettings(encoding, compression, enable_dictionary,
+ enable_statistics, num_rows);
this->ReadAndCompare(compression, num_rows);
- this->WriteRequiredWithSettingsSpaced(
- encoding, compression, enable_dictionary, enable_statistics, num_rows);
+ this->WriteRequiredWithSettingsSpaced(encoding, compression, enable_dictionary,
+ enable_statistics, num_rows);
this->ReadAndCompare(compression, num_rows);
}
void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression,
- bool enable_dictionary, bool enable_statistics, int64_t num_rows) {
- ColumnProperties column_properties(
- encoding, compression, enable_dictionary, enable_statistics);
+ bool enable_dictionary, bool enable_statistics,
+ int64_t num_rows) {
+ ColumnProperties column_properties(encoding, compression, enable_dictionary,
+ enable_statistics);
std::shared_ptr<TypedColumnWriter<TestType>> writer =
this->BuildWriter(num_rows, column_properties);
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
@@ -130,16 +132,17 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
}
void WriteRequiredWithSettingsSpaced(Encoding::type encoding,
- Compression::type compression, bool enable_dictionary, bool enable_statistics,
- int64_t num_rows) {
+ Compression::type compression,
+ bool enable_dictionary, bool enable_statistics,
+ int64_t num_rows) {
std::vector<uint8_t> valid_bits(
BitUtil::RoundUpNumBytes(static_cast<uint32_t>(this->values_.size())) + 1, 255);
- ColumnProperties column_properties(
- encoding, compression, enable_dictionary, enable_statistics);
+ ColumnProperties column_properties(encoding, compression, enable_dictionary,
+ enable_statistics);
std::shared_ptr<TypedColumnWriter<TestType>> writer =
this->BuildWriter(num_rows, column_properties);
- writer->WriteBatchSpaced(
- this->values_.size(), nullptr, nullptr, valid_bits.data(), 0, this->values_ptr_);
+ writer->WriteBatchSpaced(this->values_.size(), nullptr, nullptr, valid_bits.data(), 0,
+ this->values_ptr_);
// The behaviour should be independent from the number of Close() calls
writer->Close();
writer->Close();
@@ -234,7 +237,7 @@ void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compressio
uint8_t* data_ptr = data.data();
for (int64_t i = 0; i < values_read_recently; i++) {
memcpy(data_ptr + this->descr_->type_length() * i,
- this->values_out_[i + values_read_].ptr, this->descr_->type_length());
+ this->values_out_[i + values_read_].ptr, this->descr_->type_length());
this->values_out_[i + values_read_].ptr =
data_ptr + this->descr_->type_length() * i;
}
@@ -246,7 +249,7 @@ void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compressio
}
typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
- BooleanType, ByteArrayType, FLBAType>
+ BooleanType, ByteArrayType, FLBAType>
TestTypes;
TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);
@@ -288,38 +291,38 @@ TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
*/
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::SNAPPY, false, false, LARGE_SIZE);
+ this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, false,
+ LARGE_SIZE);
}
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompression) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::BROTLI, false, false, LARGE_SIZE);
+ this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, false,
+ LARGE_SIZE);
}
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::GZIP, false, false, LARGE_SIZE);
+ this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false,
+ LARGE_SIZE);
}
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::UNCOMPRESSED, false, true, LARGE_SIZE);
+ this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true,
+ LARGE_SIZE);
}
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::SNAPPY, false, true, LARGE_SIZE);
+ this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, true,
+ LARGE_SIZE);
}
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndBrotliCompression) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::BROTLI, false, true, LARGE_SIZE);
+ this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, true,
+ LARGE_SIZE);
}
TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::GZIP, false, true, LARGE_SIZE);
+ this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, true,
+ LARGE_SIZE);
}
TYPED_TEST(TestPrimitiveWriter, Optional) {
@@ -332,8 +335,8 @@ TYPED_TEST(TestPrimitiveWriter, Optional) {
definition_levels[1] = 0;
auto writer = this->BuildWriter();
- writer->WriteBatch(
- this->values_.size(), definition_levels.data(), nullptr, this->values_ptr_);
+ writer->WriteBatch(this->values_.size(), definition_levels.data(), nullptr,
+ this->values_ptr_);
writer->Close();
// PARQUET-703
@@ -362,7 +365,7 @@ TYPED_TEST(TestPrimitiveWriter, OptionalSpaced) {
auto writer = this->BuildWriter();
writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), nullptr,
- valid_bits.data(), 0, this->values_ptr_);
+ valid_bits.data(), 0, this->values_ptr_);
writer->Close();
// PARQUET-703
@@ -387,7 +390,7 @@ TYPED_TEST(TestPrimitiveWriter, Repeated) {
auto writer = this->BuildWriter();
writer->WriteBatch(this->values_.size(), definition_levels.data(),
- repetition_levels.data(), this->values_ptr_);
+ repetition_levels.data(), this->values_ptr_);
writer->Close();
this->ReadColumn();
@@ -426,7 +429,7 @@ TYPED_TEST(TestPrimitiveWriter, RepeatedTooFewRows) {
auto writer = this->BuildWriter();
writer->WriteBatch(this->values_.size(), definition_levels.data(),
- repetition_levels.data(), this->values_ptr_);
+ repetition_levels.data(), this->values_ptr_);
ASSERT_THROW(writer->Close(), ParquetException);
}
@@ -485,8 +488,8 @@ TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
auto writer = this->BuildWriter(LARGE_SIZE);
// All values being written are NULL
- writer->WriteBatch(
- this->values_.size(), definition_levels.data(), repetition_levels.data(), NULL);
+ writer->WriteBatch(this->values_.size(), definition_levels.data(),
+ repetition_levels.data(), NULL);
writer->Close();
// Just read the first SMALL_SIZE rows to ensure we could read it back in
@@ -512,7 +515,7 @@ TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) {
}
void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level,
- std::vector<int16_t>& input_levels) {
+ std::vector<int16_t>& input_levels) {
// for each repetition count upto max_repeat_factor
for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) {
// repeat count increases by a factor of 2 for every iteration
@@ -531,7 +534,7 @@ void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level,
}
void EncodeLevels(Encoding::type encoding, int max_level, int num_levels,
- const int16_t* input_levels, std::vector<uint8_t>& bytes) {
+ const int16_t* input_levels, std::vector<uint8_t>& bytes) {
LevelEncoder encoder;
int levels_count = 0;
bytes.resize(2 * num_levels);
@@ -540,20 +543,21 @@ void EncodeLevels(Encoding::type encoding, int max_level, int num_levels,
if (encoding == Encoding::RLE) {
// leave space to write the rle length value
encoder.Init(encoding, max_level, num_levels, bytes.data() + sizeof(int32_t),
- static_cast<int>(bytes.size()));
+ static_cast<int>(bytes.size()));
levels_count = encoder.Encode(num_levels, input_levels);
(reinterpret_cast<int32_t*>(bytes.data()))[0] = encoder.len();
} else {
- encoder.Init(
- encoding, max_level, num_levels, bytes.data(), static_cast<int>(bytes.size()));
+ encoder.Init(encoding, max_level, num_levels, bytes.data(),
+ static_cast<int>(bytes.size()));
levels_count = encoder.Encode(num_levels, input_levels);
}
ASSERT_EQ(num_levels, levels_count);
}
void VerifyDecodingLevels(Encoding::type encoding, int max_level,
- std::vector<int16_t>& input_levels, std::vector<uint8_t>& bytes) {
+ std::vector<int16_t>& input_levels,
+ std::vector<uint8_t>& bytes) {
LevelDecoder decoder;
int levels_count = 0;
std::vector<int16_t> output_levels;
@@ -590,7 +594,8 @@ void VerifyDecodingLevels(Encoding::type encoding, int max_level,
}
void VerifyDecodingMultipleSetData(Encoding::type encoding, int max_level,
- std::vector<int16_t>& input_levels, std::vector<std::vector<uint8_t>>& bytes) {
+ std::vector<int16_t>& input_levels,
+ std::vector<std::vector<uint8_t>>& bytes) {
LevelDecoder decoder;
int levels_count = 0;
std::vector<int16_t> output_levels;
@@ -634,7 +639,7 @@ TEST(TestLevels, TestLevelsDecodeMultipleBitWidth) {
// Generate levels
GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels);
EncodeLevels(encoding, max_level, static_cast<int>(input_levels.size()),
- input_levels.data(), bytes);
+ input_levels.data(), bytes);
VerifyDecodingLevels(encoding, max_level, input_levels, bytes);
input_levels.clear();
}
@@ -662,7 +667,7 @@ TEST(TestLevels, TestLevelsDecodeMultipleSetData) {
for (int rf = 0; rf < setdata_factor; rf++) {
int offset = rf * split_level_size;
EncodeLevels(encoding, max_level, split_level_size,
- reinterpret_cast<int16_t*>(input_levels.data()) + offset, bytes[rf]);
+ reinterpret_cast<int16_t*>(input_levels.data()) + offset, bytes[rf]);
}
VerifyDecodingMultipleSetData(encoding, max_level, input_levels, bytes);
}
@@ -685,8 +690,8 @@ TEST(TestLevelEncoder, MinimumBufferSize) {
LevelEncoder::MaxBufferSize(Encoding::RLE, 1, kNumToEncode));
LevelEncoder encoder;
- encoder.Init(
- Encoding::RLE, 1, kNumToEncode, output.data(), static_cast<int>(output.size()));
+ encoder.Init(Encoding::RLE, 1, kNumToEncode, output.data(),
+ static_cast<int>(output.size()));
int encode_count = encoder.Encode(kNumToEncode, levels.data());
ASSERT_EQ(kNumToEncode, encode_count);
@@ -718,7 +723,7 @@ TEST(TestLevelEncoder, MinimumBufferSize2) {
LevelEncoder encoder;
encoder.Init(Encoding::RLE, bit_width, kNumToEncode, output.data(),
- static_cast<int>(output.size()));
+ static_cast<int>(output.size()));
int encode_count = encoder.Encode(kNumToEncode, levels.data());
ASSERT_EQ(kNumToEncode, encode_count);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/column_writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index 21550da..b36f395 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -35,7 +35,7 @@ LevelEncoder::LevelEncoder() {}
LevelEncoder::~LevelEncoder() {}
void LevelEncoder::Init(Encoding::type encoding, int16_t max_level,
- int num_buffered_values, uint8_t* data, int data_size) {
+ int num_buffered_values, uint8_t* data, int data_size) {
bit_width_ = BitUtil::Log2(max_level + 1);
encoding_ = encoding;
switch (encoding) {
@@ -54,8 +54,8 @@ void LevelEncoder::Init(Encoding::type encoding, int16_t max_level,
}
}
-int LevelEncoder::MaxBufferSize(
- Encoding::type encoding, int16_t max_level, int num_buffered_values) {
+int LevelEncoder::MaxBufferSize(Encoding::type encoding, int16_t max_level,
+ int num_buffered_values) {
int bit_width = BitUtil::Log2(max_level + 1);
int num_bytes = 0;
switch (encoding) {
@@ -84,14 +84,18 @@ int LevelEncoder::Encode(int batch_size, const int16_t* levels) {
if (encoding_ == Encoding::RLE) {
for (int i = 0; i < batch_size; ++i) {
- if (!rle_encoder_->Put(*(levels + i))) { break; }
+ if (!rle_encoder_->Put(*(levels + i))) {
+ break;
+ }
++num_encoded;
}
rle_encoder_->Flush();
rle_length_ = rle_encoder_->len();
} else {
for (int i = 0; i < batch_size; ++i) {
- if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) { break; }
+ if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) {
+ break;
+ }
++num_encoded;
}
bit_packed_encoder_->Flush();
@@ -109,8 +113,9 @@ std::shared_ptr<WriterProperties> default_writer_properties() {
}
ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager, int64_t expected_rows, bool has_dictionary,
- Encoding::type encoding, const WriterProperties* properties)
+ std::unique_ptr<PageWriter> pager, int64_t expected_rows,
+ bool has_dictionary, Encoding::type encoding,
+ const WriterProperties* properties)
: metadata_(metadata),
descr_(metadata->descr()),
pager_(std::move(pager)),
@@ -147,22 +152,22 @@ void ColumnWriter::InitSinks() {
void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
DCHECK(!closed_);
- definition_levels_sink_->Write(
- reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels);
+ definition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels),
+ sizeof(int16_t) * num_levels);
}
void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
DCHECK(!closed_);
- repetition_levels_sink_->Write(
- reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels);
+ repetition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels),
+ sizeof(int16_t) * num_levels);
}
// return the size of the encoded buffer
-int64_t ColumnWriter::RleEncodeLevels(
- const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level) {
+int64_t ColumnWriter::RleEncodeLevels(const Buffer& src_buffer,
+ ResizableBuffer* dest_buffer, int16_t max_level) {
// TODO: This only works with due to some RLE specifics
int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
- static_cast<int>(num_buffered_values_)) +
+ static_cast<int>(num_buffered_values_)) +
sizeof(int32_t);
// Use Arrow::Buffer::shrink_to_fit = false
@@ -170,10 +175,11 @@ int64_t ColumnWriter::RleEncodeLevels(
PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false));
level_encoder_.Init(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_),
- dest_buffer->mutable_data() + sizeof(int32_t),
- static_cast<int>(dest_buffer->size()) - sizeof(int32_t));
- int encoded = level_encoder_.Encode(static_cast<int>(num_buffered_values_),
- reinterpret_cast<const int16_t*>(src_buffer.data()));
+ dest_buffer->mutable_data() + sizeof(int32_t),
+ static_cast<int>(dest_buffer->size()) - sizeof(int32_t));
+ int encoded =
+ level_encoder_.Encode(static_cast<int>(num_buffered_values_),
+ reinterpret_cast<const int16_t*>(src_buffer.data()));
DCHECK_EQ(encoded, num_buffered_values_);
reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
int64_t encoded_size = level_encoder_.len() + sizeof(int32_t);
@@ -187,13 +193,15 @@ void ColumnWriter::AddDataPage() {
std::shared_ptr<Buffer> values = GetValuesBuffer();
if (descr_->max_definition_level() > 0) {
- definition_levels_rle_size = RleEncodeLevels(definition_levels_sink_->GetBufferRef(),
- definition_levels_rle_.get(), descr_->max_definition_level());
+ definition_levels_rle_size =
+ RleEncodeLevels(definition_levels_sink_->GetBufferRef(),
+ definition_levels_rle_.get(), descr_->max_definition_level());
}
if (descr_->max_repetition_level() > 0) {
- repetition_levels_rle_size = RleEncodeLevels(repetition_levels_sink_->GetBufferRef(),
- repetition_levels_rle_.get(), descr_->max_repetition_level());
+ repetition_levels_rle_size =
+ RleEncodeLevels(repetition_levels_sink_->GetBufferRef(),
+ repetition_levels_rle_.get(), descr_->max_repetition_level());
}
int64_t uncompressed_size =
@@ -226,15 +234,16 @@ void ColumnWriter::AddDataPage() {
// if dictionary encoding has fallen back to PLAIN
if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding
std::shared_ptr<Buffer> compressed_data_copy;
- PARQUET_THROW_NOT_OK(compressed_data->Copy(
- 0, compressed_data->size(), allocator_, &compressed_data_copy));
+ PARQUET_THROW_NOT_OK(compressed_data->Copy(0, compressed_data->size(), allocator_,
+ &compressed_data_copy));
CompressedDataPage page(compressed_data_copy,
- static_cast<int32_t>(num_buffered_values_), encoding_, Encoding::RLE,
- Encoding::RLE, uncompressed_size, page_stats);
+ static_cast<int32_t>(num_buffered_values_), encoding_,
+ Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
data_pages_.push_back(std::move(page));
} else { // Eagerly write pages
CompressedDataPage page(compressed_data, static_cast<int32_t>(num_buffered_values_),
- encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
+ encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size,
+ page_stats);
WriteDataPage(page);
}
@@ -251,7 +260,9 @@ void ColumnWriter::WriteDataPage(const CompressedDataPage& page) {
int64_t ColumnWriter::Close() {
if (!closed_) {
closed_ = true;
- if (has_dictionary_ && !fallback_) { WriteDictionaryPage(); }
+ if (has_dictionary_ && !fallback_) {
+ WriteDictionaryPage();
+ }
FlushBufferedDataPages();
@@ -272,7 +283,9 @@ int64_t ColumnWriter::Close() {
void ColumnWriter::FlushBufferedDataPages() {
// Write all outstanding data to a new page
- if (num_buffered_values_ > 0) { AddDataPage(); }
+ if (num_buffered_values_ > 0) {
+ AddDataPage();
+ }
for (size_t i = 0; i < data_pages_.size(); i++) {
WriteDataPage(data_pages_[i]);
}
@@ -284,12 +297,13 @@ void ColumnWriter::FlushBufferedDataPages() {
template <typename Type>
TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
- const WriterProperties* properties)
+ std::unique_ptr<PageWriter> pager,
+ int64_t expected_rows, Encoding::type encoding,
+ const WriterProperties* properties)
: ColumnWriter(metadata, std::move(pager), expected_rows,
- (encoding == Encoding::PLAIN_DICTIONARY ||
- encoding == Encoding::RLE_DICTIONARY),
- encoding, properties) {
+ (encoding == Encoding::PLAIN_DICTIONARY ||
+ encoding == Encoding::RLE_DICTIONARY),
+ encoding, properties) {
switch (encoding) {
case Encoding::PLAIN:
current_encoder_.reset(new PlainEncoder<Type>(descr_, properties->memory_pool()));
@@ -334,8 +348,8 @@ void TypedColumnWriter<Type>::WriteDictionaryPage() {
// TODO Get rid of this deep call
dict_encoder->mem_pool()->FreeAll();
- DictionaryPage page(
- buffer, dict_encoder->num_entries(), properties_->dictionary_index_encoding());
+ DictionaryPage page(buffer, dict_encoder->num_entries(),
+ properties_->dictionary_index_encoding());
total_bytes_written_ += pager_->WriteDictionaryPage(page);
}
@@ -365,8 +379,9 @@ void TypedColumnWriter<Type>::ResetPageStatistics() {
// Dynamic column writer constructor
std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager, int64_t expected_rows,
- const WriterProperties* properties) {
+ std::unique_ptr<PageWriter> pager,
+ int64_t expected_rows,
+ const WriterProperties* properties) {
const ColumnDescriptor* descr = metadata->descr();
Encoding::type encoding = properties->encoding(descr->path());
if (properties->dictionary_enabled(descr->path()) &&
@@ -375,26 +390,26 @@ std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* met
}
switch (descr->physical_type()) {
case Type::BOOLEAN:
- return std::make_shared<BoolWriter>(
- metadata, std::move(pager), expected_rows, encoding, properties);
+ return std::make_shared<BoolWriter>(metadata, std::move(pager), expected_rows,
+ encoding, properties);
case Type::INT32:
- return std::make_shared<Int32Writer>(
- metadata, std::move(pager), expected_rows, encoding, properties);
+ return std::make_shared<Int32Writer>(metadata, std::move(pager), expected_rows,
+ encoding, properties);
case Type::INT64:
- return std::make_shared<Int64Writer>(
- metadata, std::move(pager), expected_rows, encoding, properties);
+ return std::make_shared<Int64Writer>(metadata, std::move(pager), expected_rows,
+ encoding, properties);
case Type::INT96:
- return std::make_shared<Int96Writer>(
- metadata, std::move(pager), expected_rows, encoding, properties);
+ return std::make_shared<Int96Writer>(metadata, std::move(pager), expected_rows,
+ encoding, properties);
case Type::FLOAT:
- return std::make_shared<FloatWriter>(
- metadata, std::move(pager), expected_rows, encoding, properties);
+ return std::make_shared<FloatWriter>(metadata, std::move(pager), expected_rows,
+ encoding, properties);
case Type::DOUBLE:
- return std::make_shared<DoubleWriter>(
- metadata, std::move(pager), expected_rows, encoding, properties);
+ return std::make_shared<DoubleWriter>(metadata, std::move(pager), expected_rows,
+ encoding, properties);
case Type::BYTE_ARRAY:
- return std::make_shared<ByteArrayWriter>(
- metadata, std::move(pager), expected_rows, encoding, properties);
+ return std::make_shared<ByteArrayWriter>(metadata, std::move(pager), expected_rows,
+ encoding, properties);
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_shared<FixedLenByteArrayWriter>(
metadata, std::move(pager), expected_rows, encoding, properties);
@@ -410,12 +425,16 @@ std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* met
template <typename DType>
inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
- const int16_t* def_levels, const int16_t* rep_levels, const T* values) {
+ const int16_t* def_levels,
+ const int16_t* rep_levels,
+ const T* values) {
int64_t values_to_write = 0;
// If the field is required and non-repeated, there are no definition levels
if (descr_->max_definition_level() > 0) {
for (int64_t i = 0; i < num_values; ++i) {
- if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; }
+ if (def_levels[i] == descr_->max_definition_level()) {
+ ++values_to_write;
+ }
}
WriteDefinitionLevels(num_values, def_levels);
@@ -429,7 +448,9 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
// A row could include more than one value
// Count the occasions where we start a new row
for (int64_t i = 0; i < num_values; ++i) {
- if (rep_levels[i] == 0) { num_rows_++; }
+ if (rep_levels[i] == 0) {
+ num_rows_++;
+ }
}
WriteRepetitionLevels(num_values, rep_levels);
@@ -443,7 +464,9 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
}
// PARQUET-780
- if (values_to_write > 0) { DCHECK(nullptr != values) << "Values ptr cannot be NULL"; }
+ if (values_to_write > 0) {
+ DCHECK(nullptr != values) << "Values ptr cannot be NULL";
+ }
WriteValues(values_to_write, values);
@@ -457,25 +480,34 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
AddDataPage();
}
- if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }
+ if (has_dictionary_ && !fallback_) {
+ CheckDictionarySizeLimit();
+ }
return values_to_write;
}
template <typename DType>
-inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values,
- const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const T* values, int64_t* num_spaced_written) {
+inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(
+ int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels,
+ const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values,
+ int64_t* num_spaced_written) {
int64_t values_to_write = 0;
int64_t spaced_values_to_write = 0;
// If the field is required and non-repeated, there are no definition levels
if (descr_->max_definition_level() > 0) {
// Minimal definition level for which spaced values are written
int16_t min_spaced_def_level = descr_->max_definition_level();
- if (descr_->schema_node()->is_optional()) { min_spaced_def_level--; }
+ if (descr_->schema_node()->is_optional()) {
+ min_spaced_def_level--;
+ }
for (int64_t i = 0; i < num_values; ++i) {
- if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; }
- if (def_levels[i] >= min_spaced_def_level) { ++spaced_values_to_write; }
+ if (def_levels[i] == descr_->max_definition_level()) {
+ ++values_to_write;
+ }
+ if (def_levels[i] >= min_spaced_def_level) {
+ ++spaced_values_to_write;
+ }
}
WriteDefinitionLevels(num_values, def_levels);
@@ -490,7 +522,9 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values
// A row could include more than one value
// Count the occasions where we start a new row
for (int64_t i = 0; i < num_values; ++i) {
- if (rep_levels[i] == 0) { num_rows_++; }
+ if (rep_levels[i] == 0) {
+ num_rows_++;
+ }
}
WriteRepetitionLevels(num_values, rep_levels);
@@ -512,7 +546,7 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values
if (page_statistics_ != nullptr) {
page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, values_to_write,
- num_values - values_to_write);
+ num_values - values_to_write);
}
num_buffered_values_ += num_values;
@@ -521,14 +555,16 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values
if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
AddDataPage();
}
- if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }
+ if (has_dictionary_ && !fallback_) {
+ CheckDictionarySizeLimit();
+ }
return values_to_write;
}
template <typename DType>
void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const T* values) {
+ const int16_t* rep_levels, const T* values) {
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
@@ -541,19 +577,19 @@ void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def
for (int round = 0; round < num_batches; round++) {
int64_t offset = round * write_batch_size;
int64_t num_values = WriteMiniBatch(write_batch_size, &def_levels[offset],
- &rep_levels[offset], &values[value_offset]);
+ &rep_levels[offset], &values[value_offset]);
value_offset += num_values;
}
// Write the remaining values
int64_t offset = num_batches * write_batch_size;
- WriteMiniBatch(
- num_remaining, &def_levels[offset], &rep_levels[offset], &values[value_offset]);
+ WriteMiniBatch(num_remaining, &def_levels[offset], &rep_levels[offset],
+ &values[value_offset]);
}
template <typename DType>
-void TypedColumnWriter<DType>::WriteBatchSpaced(int64_t num_values,
- const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const T* values) {
+void TypedColumnWriter<DType>::WriteBatchSpaced(
+ int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels,
+ const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) {
// We check for DataPage limits only after we have inserted the values. If a user
// writes a large number of values, the DataPage size can be much above the limit.
// The purpose of this chunking is to bound this. Even if a user writes large number
@@ -567,15 +603,15 @@ void TypedColumnWriter<DType>::WriteBatchSpaced(int64_t num_values,
for (int round = 0; round < num_batches; round++) {
int64_t offset = round * write_batch_size;
WriteMiniBatchSpaced(write_batch_size, &def_levels[offset], &rep_levels[offset],
- valid_bits, valid_bits_offset + values_offset, values + values_offset,
- &num_spaced_written);
+ valid_bits, valid_bits_offset + values_offset,
+ values + values_offset, &num_spaced_written);
values_offset += num_spaced_written;
}
// Write the remaining values
int64_t offset = num_batches * write_batch_size;
WriteMiniBatchSpaced(num_remaining, &def_levels[offset], &rep_levels[offset],
- valid_bits, valid_bits_offset + values_offset, values + values_offset,
- &num_spaced_written);
+ valid_bits, valid_bits_offset + values_offset,
+ values + values_offset, &num_spaced_written);
}
template <typename DType>
@@ -585,9 +621,11 @@ void TypedColumnWriter<DType>::WriteValues(int64_t num_values, const T* values)
template <typename DType>
void TypedColumnWriter<DType>::WriteValuesSpaced(int64_t num_values,
- const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) {
- current_encoder_->PutSpaced(
- values, static_cast<int>(num_values), valid_bits, valid_bits_offset);
+ const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ const T* values) {
+ current_encoder_->PutSpaced(values, static_cast<int>(num_values), valid_bits,
+ valid_bits_offset);
}
template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<BooleanType>;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/column_writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
index 1637780..837d2d0 100644
--- a/src/parquet/column_writer.h
+++ b/src/parquet/column_writer.h
@@ -44,12 +44,12 @@ class PARQUET_EXPORT LevelEncoder {
LevelEncoder();
~LevelEncoder();
- static int MaxBufferSize(
- Encoding::type encoding, int16_t max_level, int num_buffered_values);
+ static int MaxBufferSize(Encoding::type encoding, int16_t max_level,
+ int num_buffered_values);
// Initialize the LevelEncoder.
void Init(Encoding::type encoding, int16_t max_level, int num_buffered_values,
- uint8_t* data, int data_size);
+ uint8_t* data, int data_size);
// Encodes a batch of levels from an array and returns the number of levels encoded
int Encode(int batch_size, const int16_t* levels);
@@ -73,12 +73,13 @@ static constexpr int WRITE_BATCH_SIZE = 1000;
class PARQUET_EXPORT ColumnWriter {
public:
ColumnWriter(ColumnChunkMetaDataBuilder*, std::unique_ptr<PageWriter>,
- int64_t expected_rows, bool has_dictionary, Encoding::type encoding,
- const WriterProperties* properties);
+ int64_t expected_rows, bool has_dictionary, Encoding::type encoding,
+ const WriterProperties* properties);
static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*,
- std::unique_ptr<PageWriter>, int64_t expected_rows,
- const WriterProperties* properties);
+ std::unique_ptr<PageWriter>,
+ int64_t expected_rows,
+ const WriterProperties* properties);
Type::type type() const { return descr_->physical_type(); }
@@ -126,8 +127,8 @@ class PARQUET_EXPORT ColumnWriter {
void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels);
// RLE encode the src_buffer into dest_buffer and return the encoded size
- int64_t RleEncodeLevels(
- const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level);
+ int64_t RleEncodeLevels(const Buffer& src_buffer, ResizableBuffer* dest_buffer,
+ int16_t max_level);
// Serialize the buffered Data Pages
void FlushBufferedDataPages();
@@ -194,13 +195,13 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
typedef typename DType::c_type T;
TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
- const WriterProperties* properties);
+ std::unique_ptr<PageWriter> pager, int64_t expected_rows,
+ Encoding::type encoding, const WriterProperties* properties);
// Write a batch of repetition levels, definition levels, and values to the
// column.
void WriteBatch(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const T* values);
+ const int16_t* rep_levels, const T* values);
/// Write a batch of repetition levels, definition levels, and values to the
/// column.
@@ -229,8 +230,8 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
/// spacing for nulls on the lowest levels; input has the length
/// of the number of rows on the lowest nesting level.
void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
- const T* values);
+ const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const T* values);
protected:
std::shared_ptr<Buffer> GetValuesBuffer() override {
@@ -244,18 +245,19 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
private:
int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const T* values);
+ const int16_t* rep_levels, const T* values);
int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
- const T* values, int64_t* num_spaced_written);
+ const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const T* values,
+ int64_t* num_spaced_written);
typedef Encoder<DType> EncoderType;
// Write values to a temporary buffer before they are encoded into pages
void WriteValues(int64_t num_values, const T* values);
void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const T* values);
+ int64_t valid_bits_offset, const T* values);
std::unique_ptr<EncoderType> current_encoder_;
typedef TypedRowGroupStatistics<DType> TypedStats;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/encoding-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encoding-benchmark.cc b/src/parquet/encoding-benchmark.cc
index 1e93ba7..97eeefa 100644
--- a/src/parquet/encoding-benchmark.cc
+++ b/src/parquet/encoding-benchmark.cc
@@ -33,8 +33,8 @@ namespace benchmark {
std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
auto node = PrimitiveNode::Make("int64", repetition, Type::INT64);
- return std::make_shared<ColumnDescriptor>(
- node, repetition != Repetition::REQUIRED, repetition == Repetition::REPEATED);
+ return std::make_shared<ColumnDescriptor>(node, repetition != Repetition::REQUIRED,
+ repetition == Repetition::REPEATED);
}
static void BM_PlainEncodingBoolean(::benchmark::State& state) {
@@ -99,8 +99,8 @@ static void BM_PlainDecodingInt64(::benchmark::State& state) {
BENCHMARK(BM_PlainDecodingInt64)->Range(1024, 65536);
template <typename Type>
-static void DecodeDict(
- std::vector<typename Type::c_type>& values, ::benchmark::State& state) {
+static void DecodeDict(std::vector<typename Type::c_type>& values,
+ ::benchmark::State& state) {
typedef typename Type::c_type T;
int num_values = values.size();