You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2017/07/31 15:15:01 UTC
[4/5] parquet-cpp git commit: PARQUET-1068: Modify .clang-format to
use straight Google format with 90-character line width
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index e941c1f..a41ad57 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -84,7 +84,9 @@ Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
int task_id;
while (!error_occurred) {
task_id = task_counter.fetch_add(1);
- if (task_id >= num_tasks) { break; }
+ if (task_id >= num_tasks) {
+ break;
+ }
Status s = func(task_id);
if (!s.ok()) {
std::lock_guard<std::mutex> lock(error_mtx);
@@ -98,7 +100,9 @@ Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
for (auto&& thread : thread_pool) {
thread.join();
}
- if (error_occurred) { return error; }
+ if (error_occurred) {
+ return error;
+ }
return Status::OK();
}
@@ -154,14 +158,16 @@ class AllRowGroupsIterator : public FileColumnIterator {
class SingleRowGroupIterator : public FileColumnIterator {
public:
- explicit SingleRowGroupIterator(
- int column_index, int row_group_number, ParquetFileReader* reader)
+ explicit SingleRowGroupIterator(int column_index, int row_group_number,
+ ParquetFileReader* reader)
: FileColumnIterator(column_index, reader),
row_group_number_(row_group_number),
done_(false) {}
std::shared_ptr<::parquet::ColumnReader> Next() override {
- if (done_) { return nullptr; }
+ if (done_) {
+ return nullptr;
+ }
auto result = reader_->RowGroup(row_group_number_)->Column(column_index_);
done_ = true;
@@ -185,16 +191,16 @@ class FileReader::Impl {
Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
Status ReadSchemaField(int i, std::shared_ptr<Array>* out);
- Status ReadSchemaField(
- int i, const std::vector<int>& indices, std::shared_ptr<Array>* out);
+ Status ReadSchemaField(int i, const std::vector<int>& indices,
+ std::shared_ptr<Array>* out);
Status GetReaderForNode(int index, const NodePtr& node, const std::vector<int>& indices,
- int16_t def_level, std::unique_ptr<ColumnReader::Impl>* out);
+ int16_t def_level, std::unique_ptr<ColumnReader::Impl>* out);
Status ReadColumn(int i, std::shared_ptr<Array>* out);
Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
- Status GetSchema(
- const std::vector<int>& indices, std::shared_ptr<::arrow::Schema>* out);
+ Status GetSchema(const std::vector<int>& indices,
+ std::shared_ptr<::arrow::Schema>* out);
Status ReadRowGroup(int row_group_index, const std::vector<int>& indices,
- std::shared_ptr<::arrow::Table>* out);
+ std::shared_ptr<::arrow::Table>* out);
Status ReadTable(const std::vector<int>& indices, std::shared_ptr<Table>* table);
Status ReadTable(std::shared_ptr<Table>* table);
Status ReadRowGroup(int i, std::shared_ptr<Table>* table);
@@ -258,13 +264,13 @@ class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::Impl {
Status InitValidBits(int batch_size);
template <typename ArrowType, typename ParquetType>
Status ReadNullableBatch(TypedColumnReader<ParquetType>* reader, int16_t* def_levels,
- int16_t* rep_levels, int64_t values_to_read, int64_t* levels_read,
- int64_t* values_read);
+ int16_t* rep_levels, int64_t values_to_read,
+ int64_t* levels_read, int64_t* values_read);
template <typename ArrowType, typename ParquetType>
Status ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
- int64_t values_to_read, int64_t* levels_read);
+ int64_t values_to_read, int64_t* levels_read);
Status WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels,
- int64_t total_values_read, std::shared_ptr<Array>* array);
+ int64_t total_values_read, std::shared_ptr<Array>* array);
Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override;
Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override;
@@ -279,7 +285,7 @@ class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::Impl {
static constexpr bool value =
std::is_same<InType, OutType>::value ||
(std::is_integral<InType>{} && std::is_integral<OutType>{} &&
- (sizeof(InType) == sizeof(OutType)));
+ (sizeof(InType) == sizeof(OutType)));
};
MemoryPool* pool_;
@@ -304,7 +310,7 @@ class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::Impl {
class PARQUET_NO_EXPORT StructImpl : public ColumnReader::Impl {
public:
explicit StructImpl(const std::vector<std::shared_ptr<Impl>>& children,
- int16_t struct_def_level, MemoryPool* pool, const NodePtr& node)
+ int16_t struct_def_level, MemoryPool* pool, const NodePtr& node)
: children_(children),
struct_def_level_(struct_def_level),
pool_(pool),
@@ -326,8 +332,8 @@ class PARQUET_NO_EXPORT StructImpl : public ColumnReader::Impl {
std::shared_ptr<Field> field_;
PoolBuffer def_levels_buffer_;
- Status DefLevelsToNullArray(
- std::shared_ptr<MutableBuffer>* null_bitmap, int64_t* null_count);
+ Status DefLevelsToNullArray(std::shared_ptr<MutableBuffer>* null_bitmap,
+ int64_t* null_count);
void InitField(const NodePtr& node, const std::vector<std::shared_ptr<Impl>>& children);
};
@@ -345,8 +351,9 @@ Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
}
Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node,
- const std::vector<int>& indices, int16_t def_level,
- std::unique_ptr<ColumnReader::Impl>* out) {
+ const std::vector<int>& indices,
+ int16_t def_level,
+ std::unique_ptr<ColumnReader::Impl>* out) {
*out = nullptr;
if (IsSimpleStruct(node)) {
@@ -357,9 +364,11 @@ Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node,
// TODO(itaiin): Remove the -1 index hack when all types of nested reads
// are supported. This currently just signals the lower level reader resolution
// to abort
- RETURN_NOT_OK(GetReaderForNode(
- index, group->field(i), indices, def_level + 1, &child_reader));
- if (child_reader != nullptr) { children.push_back(std::move(child_reader)); }
+ RETURN_NOT_OK(GetReaderForNode(index, group->field(i), indices, def_level + 1,
+ &child_reader));
+ if (child_reader != nullptr) {
+ children.push_back(std::move(child_reader));
+ }
}
if (children.size() > 0) {
@@ -402,8 +411,8 @@ Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr<Array>* out) {
return ReadSchemaField(i, indices, out);
}
-Status FileReader::Impl::ReadSchemaField(
- int i, const std::vector<int>& indices, std::shared_ptr<Array>* out) {
+Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
+ std::shared_ptr<Array>* out) {
auto parquet_schema = reader_->metadata()->schema();
auto node = parquet_schema->group_node()->field(i);
@@ -437,15 +446,16 @@ Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) {
return flat_column_reader->NextBatch(static_cast<int>(batch_size), out);
}
-Status FileReader::Impl::GetSchema(
- const std::vector<int>& indices, std::shared_ptr<::arrow::Schema>* out) {
+Status FileReader::Impl::GetSchema(const std::vector<int>& indices,
+ std::shared_ptr<::arrow::Schema>* out) {
auto descr = reader_->metadata()->schema();
auto parquet_key_value_metadata = reader_->metadata()->key_value_metadata();
return FromParquetSchema(descr, indices, parquet_key_value_metadata, out);
}
Status FileReader::Impl::ReadRowGroup(int row_group_index,
- const std::vector<int>& indices, std::shared_ptr<::arrow::Table>* out) {
+ const std::vector<int>& indices,
+ std::shared_ptr<::arrow::Table>* out) {
std::shared_ptr<::arrow::Schema> schema;
RETURN_NOT_OK(GetSchema(indices, &schema));
@@ -458,7 +468,7 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
// TODO(wesm): Refactor to share more code with ReadTable
auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata,
- this](int i) {
+ this](int i) {
int column_index = indices[i];
int64_t batch_size = rg_metadata->ColumnChunk(column_index)->num_values();
@@ -486,16 +496,16 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
return Status::OK();
}
-Status FileReader::Impl::ReadTable(
- const std::vector<int>& indices, std::shared_ptr<Table>* table) {
+Status FileReader::Impl::ReadTable(const std::vector<int>& indices,
+ std::shared_ptr<Table>* table) {
std::shared_ptr<::arrow::Schema> schema;
RETURN_NOT_OK(GetSchema(indices, &schema));
// We only need to read schema fields which have columns indicated
// in the indices vector
std::vector<int> field_indices;
- if (!ColumnIndicesToFieldIndices(
- *reader_->metadata()->schema(), indices, &field_indices)) {
+ if (!ColumnIndicesToFieldIndices(*reader_->metadata()->schema(), indices,
+ &field_indices)) {
return Status::Invalid("Invalid column index");
}
@@ -541,20 +551,21 @@ Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr<Table>* table) {
// Static ctor
Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
- MemoryPool* allocator, const ReaderProperties& props,
- const std::shared_ptr<FileMetaData>& metadata, std::unique_ptr<FileReader>* reader) {
+ MemoryPool* allocator, const ReaderProperties& props,
+ const std::shared_ptr<FileMetaData>& metadata,
+ std::unique_ptr<FileReader>* reader) {
std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(file));
std::unique_ptr<ParquetReader> pq_reader;
- PARQUET_CATCH_NOT_OK(
- pq_reader = ParquetReader::Open(std::move(io_wrapper), props, metadata));
+ PARQUET_CATCH_NOT_OK(pq_reader =
+ ParquetReader::Open(std::move(io_wrapper), props, metadata));
reader->reset(new FileReader(allocator, std::move(pq_reader)));
return Status::OK();
}
Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
- MemoryPool* allocator, std::unique_ptr<FileReader>* reader) {
- return OpenFile(
- file, allocator, ::parquet::default_reader_properties(), nullptr, reader);
+ MemoryPool* allocator, std::unique_ptr<FileReader>* reader) {
+ return OpenFile(file, allocator, ::parquet::default_reader_properties(), nullptr,
+ reader);
}
Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
@@ -585,8 +596,8 @@ Status FileReader::ReadTable(std::shared_ptr<Table>* out) {
}
}
-Status FileReader::ReadTable(
- const std::vector<int>& indices, std::shared_ptr<Table>* out) {
+Status FileReader::ReadTable(const std::vector<int>& indices,
+ std::shared_ptr<Table>* out) {
try {
return impl_->ReadTable(indices, out);
} catch (const ::parquet::ParquetException& e) {
@@ -602,8 +613,8 @@ Status FileReader::ReadRowGroup(int i, std::shared_ptr<Table>* out) {
}
}
-Status FileReader::ReadRowGroup(
- int i, const std::vector<int>& indices, std::shared_ptr<Table>* out) {
+Status FileReader::ReadRowGroup(int i, const std::vector<int>& indices,
+ std::shared_ptr<Table>* out) {
try {
return impl_->ReadRowGroup(i, indices, out);
} catch (const ::parquet::ParquetException& e) {
@@ -611,13 +622,9 @@ Status FileReader::ReadRowGroup(
}
}
-int FileReader::num_row_groups() const {
- return impl_->num_row_groups();
-}
+int FileReader::num_row_groups() const { return impl_->num_row_groups(); }
-void FileReader::set_num_threads(int num_threads) {
- impl_->set_num_threads(num_threads);
-}
+void FileReader::set_num_threads(int num_threads) { impl_->set_num_threads(num_threads); }
const ParquetFileReader* FileReader::parquet_reader() const {
return impl_->parquet_reader();
@@ -625,15 +632,16 @@ const ParquetFileReader* FileReader::parquet_reader() const {
template <typename ArrowType, typename ParquetType>
Status PrimitiveImpl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
- int64_t values_to_read, int64_t* levels_read) {
+ int64_t values_to_read, int64_t* levels_read) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
int64_t values_read;
- PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(static_cast<int>(values_to_read),
- nullptr, nullptr, values, &values_read));
+ PARQUET_CATCH_NOT_OK(*levels_read =
+ reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
+ nullptr, values, &values_read));
ArrowCType* out_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
std::copy(values, values + values_read, out_ptr + valid_bits_idx_);
@@ -673,8 +681,9 @@ Status PrimitiveImpl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
int64_t values_read;
- PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(static_cast<int>(values_to_read),
- nullptr, nullptr, values, &values_read));
+ PARQUET_CATCH_NOT_OK(*levels_read =
+ reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
+ nullptr, values, &values_read));
int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
for (int64_t i = 0; i < values_read; i++) {
@@ -691,8 +700,9 @@ Status PrimitiveImpl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>(
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
int64_t values_read;
- PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(static_cast<int>(values_to_read),
- nullptr, nullptr, values, &values_read));
+ PARQUET_CATCH_NOT_OK(*levels_read =
+ reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
+ nullptr, values, &values_read));
int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
for (int64_t i = 0; i < values_read; i++) {
@@ -710,11 +720,14 @@ Status PrimitiveImpl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
int64_t values_read;
- PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(static_cast<int>(values_to_read),
- nullptr, nullptr, values, &values_read));
+ PARQUET_CATCH_NOT_OK(*levels_read =
+ reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
+ nullptr, values, &values_read));
for (int64_t i = 0; i < values_read; i++) {
- if (values[i]) { ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_); }
+ if (values[i]) {
+ ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_);
+ }
valid_bits_idx_++;
}
@@ -723,17 +736,18 @@ Status PrimitiveImpl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
template <typename ArrowType, typename ParquetType>
Status PrimitiveImpl::ReadNullableBatch(TypedColumnReader<ParquetType>* reader,
- int16_t* def_levels, int16_t* rep_levels, int64_t values_to_read,
- int64_t* levels_read, int64_t* values_read) {
+ int16_t* def_levels, int16_t* rep_levels,
+ int64_t values_to_read, int64_t* levels_read,
+ int64_t* values_read) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
int64_t null_count;
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(static_cast<int>(values_to_read),
- def_levels, rep_levels, values, valid_bits_ptr_, valid_bits_idx_, levels_read,
- values_read, &null_count));
+ PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
+ static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
+ valid_bits_idx_, levels_read, values_read, &null_count));
auto data_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
@@ -758,9 +772,10 @@ Status PrimitiveImpl::ReadNullableBatch(TypedColumnReader<ParquetType>* reader,
int64_t * values_read) { \
auto data_ptr = reinterpret_cast<CType*>(data_buffer_ptr_); \
int64_t null_count; \
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(static_cast<int>(values_to_read), \
- def_levels, rep_levels, data_ptr + valid_bits_idx_, valid_bits_ptr_, \
- valid_bits_idx_, levels_read, values_read, &null_count)); \
+ PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced( \
+ static_cast<int>(values_to_read), def_levels, rep_levels, \
+ data_ptr + valid_bits_idx_, valid_bits_ptr_, valid_bits_idx_, levels_read, \
+ values_read, &null_count)); \
\
valid_bits_idx_ += *values_read; \
null_count_ += null_count; \
@@ -784,9 +799,9 @@ Status PrimitiveImpl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
int64_t null_count;
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(static_cast<int>(values_to_read),
- def_levels, rep_levels, values, valid_bits_ptr_, valid_bits_idx_, levels_read,
- values_read, &null_count));
+ PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
+ static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
+ valid_bits_idx_, levels_read, values_read, &null_count));
auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
@@ -809,9 +824,9 @@ Status PrimitiveImpl::ReadNullableBatch<::arrow::Date64Type, Int32Type>(
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
int64_t null_count;
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(static_cast<int>(values_to_read),
- def_levels, rep_levels, values, valid_bits_ptr_, valid_bits_idx_, levels_read,
- values_read, &null_count));
+ PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
+ static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
+ valid_bits_idx_, levels_read, values_read, &null_count));
auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
@@ -834,14 +849,16 @@ Status PrimitiveImpl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
int64_t null_count;
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(static_cast<int>(values_to_read),
- def_levels, rep_levels, values, valid_bits_ptr_, valid_bits_idx_, levels_read,
- values_read, &null_count));
+ PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
+ static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
+ valid_bits_idx_, levels_read, values_read, &null_count));
INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
for (int64_t i = 0; i < *values_read; i++) {
if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
- if (values[i]) { ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_ + i); }
+ if (values[i]) {
+ ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_ + i);
+ }
}
READ_NEXT_BITSET(valid_bits_ptr_);
}
@@ -886,10 +903,13 @@ Status PrimitiveImpl::InitValidBits(int batch_size) {
}
Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
- const int16_t* rep_levels, int64_t total_levels_read, std::shared_ptr<Array>* array) {
+ const int16_t* rep_levels,
+ int64_t total_levels_read,
+ std::shared_ptr<Array>* array) {
std::shared_ptr<::arrow::Schema> arrow_schema;
RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()},
- input_->metadata()->key_value_metadata(), &arrow_schema));
+ input_->metadata()->key_value_metadata(),
+ &arrow_schema));
std::shared_ptr<Field> current_field = arrow_schema->field(0);
if (descr_->max_repetition_level() > 0) {
@@ -920,14 +940,18 @@ Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
// This describes the minimal definition that describes a level that
// reflects a value in the primitive values array.
int16_t values_def_level = descr_->max_definition_level();
- if (nullable[nullable.size() - 1]) { values_def_level--; }
+ if (nullable[nullable.size() - 1]) {
+ values_def_level--;
+ }
// The definition levels that are needed so that a list is declared
// as empty and not null.
std::vector<int16_t> empty_def_level(list_depth);
int def_level = 0;
for (int i = 0; i < list_depth; i++) {
- if (nullable[i]) { def_level++; }
+ if (nullable[i]) {
+ def_level++;
+ }
empty_def_level[i] = def_level;
def_level++;
}
@@ -951,11 +975,15 @@ Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
break;
} else {
RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
- if (empty_def_level[j] == def_levels[i]) { break; }
+ if (empty_def_level[j] == def_levels[i]) {
+ break;
+ }
}
}
}
- if (def_levels[i] >= values_def_level) { values_offset++; }
+ if (def_levels[i] >= values_def_level) {
+ values_offset++;
+ }
}
// Add the final offset to all lists
for (int64_t j = 0; j < list_depth; j++) {
@@ -1013,18 +1041,20 @@ Status PrimitiveImpl::TypedReadBatch(int batch_size, std::shared_ptr<Array>* out
int64_t values_read;
int64_t levels_read;
if (descr_->max_definition_level() == 0) {
- RETURN_NOT_OK((ReadNonNullableBatch<ArrowType, ParquetType>(
- reader, values_to_read, &values_read)));
+ RETURN_NOT_OK((ReadNonNullableBatch<ArrowType, ParquetType>(reader, values_to_read,
+ &values_read)));
} else {
// As per the defintion and checks for flat (list) columns:
// descr_->max_definition_level() > 0, <= 3
- RETURN_NOT_OK((ReadNullableBatch<ArrowType, ParquetType>(reader,
- def_levels + total_levels_read, rep_levels + total_levels_read, values_to_read,
- &levels_read, &values_read)));
+ RETURN_NOT_OK((ReadNullableBatch<ArrowType, ParquetType>(
+ reader, def_levels + total_levels_read, rep_levels + total_levels_read,
+ values_to_read, &levels_read, &values_read)));
total_levels_read += static_cast<int>(levels_read);
}
values_to_read -= static_cast<int>(values_read);
- if (!column_reader_->HasNext()) { NextRowGroup(); }
+ if (!column_reader_->HasNext()) {
+ NextRowGroup();
+ }
}
// Shrink arrays as they may be larger than the output.
@@ -1039,8 +1069,8 @@ Status PrimitiveImpl::TypedReadBatch(int batch_size, std::shared_ptr<Array>* out
// Relase the ownership as the Buffer is now part of a new Array
valid_bits_buffer_.reset();
} else {
- *out = std::make_shared<ArrayType<ArrowType>>(
- field_->type(), valid_bits_idx_, data_buffer_);
+ *out = std::make_shared<ArrayType<ArrowType>>(field_->type(), valid_bits_idx_,
+ data_buffer_);
}
// Relase the ownership as the Buffer is now part of a new Array
data_buffer_.reset();
@@ -1076,13 +1106,15 @@ Status PrimitiveImpl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
} else {
// As per the defintion and checks for flat columns:
// descr_->max_definition_level() == 1
- RETURN_NOT_OK((ReadNullableBatch<::arrow::BooleanType, BooleanType>(reader,
- def_levels + total_levels_read, rep_levels + total_levels_read, values_to_read,
- &levels_read, &values_read)));
+ RETURN_NOT_OK((ReadNullableBatch<::arrow::BooleanType, BooleanType>(
+ reader, def_levels + total_levels_read, rep_levels + total_levels_read,
+ values_to_read, &levels_read, &values_read)));
total_levels_read += static_cast<int>(levels_read);
}
values_to_read -= static_cast<int>(values_read);
- if (!column_reader_->HasNext()) { NextRowGroup(); }
+ if (!column_reader_->HasNext()) {
+ NextRowGroup();
+ }
}
if (descr_->max_definition_level() > 0) {
@@ -1102,11 +1134,11 @@ Status PrimitiveImpl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
RETURN_NOT_OK(
valid_bits_buffer->Resize(::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8));
memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(),
- valid_bits_buffer->size());
+ valid_bits_buffer->size());
valid_bits_buffer_ = valid_bits_buffer;
}
- *out = std::make_shared<BooleanArray>(
- field_->type(), valid_bits_idx_, data_buffer_, valid_bits_buffer_, null_count_);
+ *out = std::make_shared<BooleanArray>(field_->type(), valid_bits_idx_, data_buffer_,
+ valid_bits_buffer_, null_count_);
// Relase the ownership
data_buffer_.reset();
valid_bits_buffer_.reset();
@@ -1141,9 +1173,9 @@ Status PrimitiveImpl::ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>*
int64_t values_read;
int64_t levels_read;
auto values = reinterpret_cast<ByteArray*>(values_buffer_.mutable_data());
- PARQUET_CATCH_NOT_OK(
- levels_read = reader->ReadBatch(values_to_read, def_levels + total_levels_read,
- rep_levels + total_levels_read, values, &values_read));
+ PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+ values_to_read, def_levels + total_levels_read,
+ rep_levels + total_levels_read, values, &values_read));
values_to_read -= static_cast<int>(levels_read);
if (descr_->max_definition_level() == 0) {
for (int64_t i = 0; i < levels_read; i++) {
@@ -1161,13 +1193,15 @@ Status PrimitiveImpl::ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>*
} else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) {
RETURN_NOT_OK(
builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr),
- values[values_idx].len));
+ values[values_idx].len));
values_idx++;
}
}
total_levels_read += static_cast<int>(levels_read);
}
- if (!column_reader_->HasNext()) { NextRowGroup(); }
+ if (!column_reader_->HasNext()) {
+ NextRowGroup();
+ }
}
RETURN_NOT_OK(builder.Finish(out));
@@ -1176,8 +1210,8 @@ Status PrimitiveImpl::ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>*
}
template <typename ArrowType>
-Status PrimitiveImpl::ReadFLBABatch(
- int batch_size, int byte_width, std::shared_ptr<Array>* out) {
+Status PrimitiveImpl::ReadFLBABatch(int batch_size, int byte_width,
+ std::shared_ptr<Array>* out) {
using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
int total_levels_read = 0;
if (descr_->max_definition_level() > 0) {
@@ -1197,9 +1231,9 @@ Status PrimitiveImpl::ReadFLBABatch(
int64_t values_read;
int64_t levels_read;
auto values = reinterpret_cast<FLBA*>(values_buffer_.mutable_data());
- PARQUET_CATCH_NOT_OK(
- levels_read = reader->ReadBatch(values_to_read, def_levels + total_levels_read,
- rep_levels + total_levels_read, values, &values_read));
+ PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+ values_to_read, def_levels + total_levels_read,
+ rep_levels + total_levels_read, values, &values_read));
values_to_read -= static_cast<int>(levels_read);
if (descr_->max_definition_level() == 0) {
for (int64_t i = 0; i < levels_read; i++) {
@@ -1219,7 +1253,9 @@ Status PrimitiveImpl::ReadFLBABatch(
}
total_levels_read += static_cast<int>(levels_read);
}
- if (!column_reader_->HasNext()) { NextRowGroup(); }
+ if (!column_reader_->HasNext()) {
+ NextRowGroup();
+ }
}
RETURN_NOT_OK(builder.Finish(out));
@@ -1304,9 +1340,7 @@ Status PrimitiveImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
}
}
-void PrimitiveImpl::NextRowGroup() {
- column_reader_ = input_->Next();
-}
+void PrimitiveImpl::NextRowGroup() { column_reader_ = input_->Next(); }
Status PrimitiveImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
*data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
@@ -1330,8 +1364,8 @@ Status ColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
// StructImpl methods
-Status StructImpl::DefLevelsToNullArray(
- std::shared_ptr<MutableBuffer>* null_bitmap_out, int64_t* null_count_out) {
+Status StructImpl::DefLevelsToNullArray(std::shared_ptr<MutableBuffer>* null_bitmap_out,
+ int64_t* null_count_out) {
std::shared_ptr<MutableBuffer> null_bitmap;
auto null_count = 0;
ValueLevelsPtr def_levels_data;
@@ -1387,7 +1421,7 @@ Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
// Check that value is either uninitialized, or current
// and previous children def levels agree on the struct level
DCHECK((result_levels[i] == -1) || ((result_levels[i] >= struct_def_level_) ==
- (child_def_levels[i] >= struct_def_level_)));
+ (child_def_levels[i] >= struct_def_level_)));
result_levels[i] =
std::max(result_levels[i], std::min(child_def_levels[i], struct_def_level_));
}
@@ -1397,8 +1431,8 @@ Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
return Status::OK();
}
-void StructImpl::InitField(
- const NodePtr& node, const std::vector<std::shared_ptr<Impl>>& children) {
+void StructImpl::InitField(const NodePtr& node,
+ const std::vector<std::shared_ptr<Impl>>& children) {
// Make a shallow node to field conversion from the children fields
std::vector<std::shared_ptr<::arrow::Field>> fields(children.size());
for (size_t i = 0; i < children.size(); i++) {
@@ -1428,8 +1462,8 @@ Status StructImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count));
- *out = std::make_shared<StructArray>(
- field()->type(), batch_size, children_arrays, null_bitmap, null_count);
+ *out = std::make_shared<StructArray>(field()->type(), batch_size, children_arrays,
+ null_bitmap, null_count);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 8d9aeb5..f9688fb 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -130,19 +130,19 @@ class PARQUET_EXPORT FileReader {
// i=1 indices={3} will read foo2 column
// i=1 indices={2} will result in out=nullptr
// leaf indices which are unrelated to the schema field are ignored
- ::arrow::Status ReadSchemaField(
- int i, const std::vector<int>& indices, std::shared_ptr<::arrow::Array>* out);
+ ::arrow::Status ReadSchemaField(int i, const std::vector<int>& indices,
+ std::shared_ptr<::arrow::Array>* out);
// Read a table of columns into a Table
::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out);
// Read a table of columns into a Table. Read only the indicated column
// indices (relative to the schema)
- ::arrow::Status ReadTable(
- const std::vector<int>& column_indices, std::shared_ptr<::arrow::Table>* out);
+ ::arrow::Status ReadTable(const std::vector<int>& column_indices,
+ std::shared_ptr<::arrow::Table>* out);
::arrow::Status ReadRowGroup(int i, const std::vector<int>& column_indices,
- std::shared_ptr<::arrow::Table>* out);
+ std::shared_ptr<::arrow::Table>* out);
::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out);
@@ -198,12 +198,15 @@ class PARQUET_EXPORT ColumnReader {
// metadata : separately-computed file metadata, can be nullptr
PARQUET_EXPORT
::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
- ::arrow::MemoryPool* allocator, const ReaderProperties& properties,
- const std::shared_ptr<FileMetaData>& metadata, std::unique_ptr<FileReader>* reader);
+ ::arrow::MemoryPool* allocator,
+ const ReaderProperties& properties,
+ const std::shared_ptr<FileMetaData>& metadata,
+ std::unique_ptr<FileReader>* reader);
PARQUET_EXPORT
::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
- ::arrow::MemoryPool* allocator, std::unique_ptr<FileReader>* reader);
+ ::arrow::MemoryPool* allocator,
+ std::unique_ptr<FileReader>* reader);
} // namespace arrow
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index d14ee4f..b0cde36 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -202,21 +202,25 @@ Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
// Forward declaration
Status NodeToFieldInternal(const NodePtr& node,
- const std::unordered_set<NodePtr>* included_leaf_nodes, std::shared_ptr<Field>* out);
+ const std::unordered_set<NodePtr>* included_leaf_nodes,
+ std::shared_ptr<Field>* out);
/*
* Auxilary function to test if a parquet schema node is a leaf node
* that should be included in a resulting arrow schema
*/
-inline bool IsIncludedLeaf(
- const NodePtr& node, const std::unordered_set<NodePtr>* included_leaf_nodes) {
- if (included_leaf_nodes == nullptr) { return true; }
+inline bool IsIncludedLeaf(const NodePtr& node,
+ const std::unordered_set<NodePtr>* included_leaf_nodes) {
+ if (included_leaf_nodes == nullptr) {
+ return true;
+ }
auto search = included_leaf_nodes->find(node);
return (search != included_leaf_nodes->end());
}
Status StructFromGroup(const GroupNode* group,
- const std::unordered_set<NodePtr>* included_leaf_nodes, TypePtr* out) {
+ const std::unordered_set<NodePtr>* included_leaf_nodes,
+ TypePtr* out) {
std::vector<std::shared_ptr<Field>> fields;
std::shared_ptr<Field> field;
@@ -224,14 +228,18 @@ Status StructFromGroup(const GroupNode* group,
for (int i = 0; i < group->field_count(); i++) {
RETURN_NOT_OK(NodeToFieldInternal(group->field(i), included_leaf_nodes, &field));
- if (field != nullptr) { fields.push_back(field); }
+ if (field != nullptr) {
+ fields.push_back(field);
+ }
+ }
+ if (fields.size() > 0) {
+ *out = std::make_shared<::arrow::StructType>(fields);
}
- if (fields.size() > 0) { *out = std::make_shared<::arrow::StructType>(fields); }
return Status::OK();
}
Status NodeToList(const GroupNode* group,
- const std::unordered_set<NodePtr>* included_leaf_nodes, TypePtr* out) {
+ const std::unordered_set<NodePtr>* included_leaf_nodes, TypePtr* out) {
*out = nullptr;
if (group->field_count() == 1) {
// This attempts to resolve the preferred 3-level list encoding.
@@ -247,7 +255,9 @@ Status NodeToList(const GroupNode* group,
RETURN_NOT_OK(
NodeToFieldInternal(list_group->field(0), included_leaf_nodes, &item_field));
- if (item_field != nullptr) { *out = ::arrow::list(item_field); }
+ if (item_field != nullptr) {
+ *out = ::arrow::list(item_field);
+ }
} else {
// List of struct
std::shared_ptr<::arrow::DataType> inner_type;
@@ -283,7 +293,8 @@ Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) {
}
Status NodeToFieldInternal(const NodePtr& node,
- const std::unordered_set<NodePtr>* included_leaf_nodes, std::shared_ptr<Field>* out) {
+ const std::unordered_set<NodePtr>* included_leaf_nodes,
+ std::shared_ptr<Field>* out) {
std::shared_ptr<::arrow::DataType> type = nullptr;
bool nullable = !node->is_required();
@@ -318,11 +329,14 @@ Status NodeToFieldInternal(const NodePtr& node,
RETURN_NOT_OK(FromPrimitive(primitive, &type));
}
}
- if (type != nullptr) { *out = std::make_shared<Field>(node->name(), type, nullable); }
+ if (type != nullptr) {
+ *out = std::make_shared<Field>(node->name(), type, nullable);
+ }
return Status::OK();
}
-Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
+Status FromParquetSchema(
+ const SchemaDescriptor* parquet_schema,
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
std::shared_ptr<::arrow::Schema>* out) {
const GroupNode* schema_node = parquet_schema->group_node();
@@ -337,8 +351,8 @@ Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
return Status::OK();
}
-Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
- const std::vector<int>& column_indices,
+Status FromParquetSchema(
+ const SchemaDescriptor* parquet_schema, const std::vector<int>& column_indices,
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
std::shared_ptr<::arrow::Schema>* out) {
// TODO(wesm): Consider adding an arrow::Schema name attribute, which comes
@@ -356,14 +370,18 @@ Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
included_leaf_nodes.insert(column_desc->schema_node());
auto column_root = parquet_schema->GetColumnRoot(column_indices[i]);
auto insertion = top_nodes.insert(column_root);
- if (insertion.second) { base_nodes.push_back(column_root); }
+ if (insertion.second) {
+ base_nodes.push_back(column_root);
+ }
}
std::vector<std::shared_ptr<Field>> fields;
std::shared_ptr<Field> field;
for (auto node : base_nodes) {
RETURN_NOT_OK(NodeToFieldInternal(node, &included_leaf_nodes, &field));
- if (field != nullptr) { fields.push_back(field); }
+ if (field != nullptr) {
+ fields.push_back(field);
+ }
}
*out = std::make_shared<::arrow::Schema>(fields, key_value_metadata);
@@ -371,18 +389,19 @@ Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
}
Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
- const std::vector<int>& column_indices, std::shared_ptr<::arrow::Schema>* out) {
+ const std::vector<int>& column_indices,
+ std::shared_ptr<::arrow::Schema>* out) {
return FromParquetSchema(parquet_schema, column_indices, nullptr, out);
}
-Status FromParquetSchema(
- const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out) {
+Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
+ std::shared_ptr<::arrow::Schema>* out) {
return FromParquetSchema(parquet_schema, nullptr, out);
}
Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name,
- bool nullable, bool support_int96_nanoseconds, const WriterProperties& properties,
- NodePtr* out) {
+ bool nullable, bool support_int96_nanoseconds,
+ const WriterProperties& properties, NodePtr* out) {
Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
NodePtr element;
@@ -395,8 +414,9 @@ Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::str
}
Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
- const std::string& name, bool nullable, bool support_int96_nanoseconds,
- const WriterProperties& properties, NodePtr* out) {
+ const std::string& name, bool nullable,
+ bool support_int96_nanoseconds, const WriterProperties& properties,
+ NodePtr* out) {
Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
std::vector<NodePtr> children(type->num_children());
@@ -410,7 +430,8 @@ Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
}
Status FieldToNode(const std::shared_ptr<Field>& field,
- const WriterProperties& properties, NodePtr* out, bool support_int96_nanoseconds) {
+ const WriterProperties& properties, NodePtr* out,
+ bool support_int96_nanoseconds) {
LogicalType::type logical_type = LogicalType::NONE;
ParquetType::type type;
Repetition::type repetition =
@@ -524,12 +545,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
case ArrowType::STRUCT: {
auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type());
return StructToNode(struct_type, field->name(), field->nullable(),
- support_int96_nanoseconds, properties, out);
+ support_int96_nanoseconds, properties, out);
} break;
case ArrowType::LIST: {
auto list_type = std::static_pointer_cast<::arrow::ListType>(field->type());
return ListToNode(list_type, field->name(), field->nullable(),
- support_int96_nanoseconds, properties, out);
+ support_int96_nanoseconds, properties, out);
} break;
default:
// TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
@@ -540,12 +561,13 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
}
Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
- const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out,
- bool support_int96_nanoseconds) {
+ const WriterProperties& properties,
+ std::shared_ptr<SchemaDescriptor>* out,
+ bool support_int96_nanoseconds) {
std::vector<NodePtr> nodes(arrow_schema->num_fields());
for (int i = 0; i < arrow_schema->num_fields(); i++) {
- RETURN_NOT_OK(FieldToNode(
- arrow_schema->field(i), properties, &nodes[i], support_int96_nanoseconds));
+ RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i],
+ support_int96_nanoseconds));
}
NodePtr schema = GroupNode::Make("schema", Repetition::REQUIRED, nodes);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/arrow/schema.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
index d4f5ea3..7d1f27e 100644
--- a/src/parquet/arrow/schema.h
+++ b/src/parquet/arrow/schema.h
@@ -36,8 +36,8 @@ namespace parquet {
namespace arrow {
-::arrow::Status PARQUET_EXPORT NodeToField(
- const schema::NodePtr& node, std::shared_ptr<::arrow::Field>* out);
+::arrow::Status PARQUET_EXPORT NodeToField(const schema::NodePtr& node,
+ std::shared_ptr<::arrow::Field>* out);
/// Convert parquet schema to arrow schema with selected indices
/// \param parquet_schema to be converted
@@ -47,31 +47,35 @@ namespace arrow {
/// \param key_value_metadata optional metadata, can be nullptr
/// \param out the corresponding arrow schema
/// \return Status::OK() on a successful conversion.
-::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema,
- const std::vector<int>& column_indices,
+::arrow::Status PARQUET_EXPORT FromParquetSchema(
+ const SchemaDescriptor* parquet_schema, const std::vector<int>& column_indices,
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
std::shared_ptr<::arrow::Schema>* out);
// Without indices
-::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema,
- const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
- std::shared_ptr<::arrow::Schema>* out);
+::arrow::Status PARQUET_EXPORT
+FromParquetSchema(const SchemaDescriptor* parquet_schema,
+ const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
+ std::shared_ptr<::arrow::Schema>* out);
// Without metadata
::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema,
- const std::vector<int>& column_indices, std::shared_ptr<::arrow::Schema>* out);
+ const std::vector<int>& column_indices,
+ std::shared_ptr<::arrow::Schema>* out);
// Without metadata or indices
-::arrow::Status PARQUET_EXPORT FromParquetSchema(
- const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out);
+::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema,
+ std::shared_ptr<::arrow::Schema>* out);
::arrow::Status PARQUET_EXPORT FieldToNode(const std::shared_ptr<::arrow::Field>& field,
- const WriterProperties& properties, schema::NodePtr* out,
- bool support_int96_nanoseconds = false);
+ const WriterProperties& properties,
+ schema::NodePtr* out,
+ bool support_int96_nanoseconds = false);
::arrow::Status PARQUET_EXPORT ToParquetSchema(const ::arrow::Schema* arrow_schema,
- const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out,
- bool support_int96_nanoseconds = false);
+ const WriterProperties& properties,
+ std::shared_ptr<SchemaDescriptor>* out,
+ bool support_int96_nanoseconds = false);
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 946afad..5980199 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -67,8 +67,8 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
// Passing data type so this will work with TimestampType too
- ::arrow::NumericBuilder<ArrowType> builder(
- ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+ ::arrow::NumericBuilder<ArrowType> builder(::arrow::default_memory_pool(),
+ std::make_shared<ArrowType>());
RETURN_NOT_OK(builder.Append(values.data(), values.size()));
return builder.Finish(out);
}
@@ -83,8 +83,8 @@ typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NonNullAr
}
// Passing data type so this will work with TimestampType too
- ::arrow::NumericBuilder<ArrowType> builder(
- ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+ ::arrow::NumericBuilder<ArrowType> builder(::arrow::default_memory_pool(),
+ std::make_shared<ArrowType>());
builder.Append(values.data(), values.size());
return builder.Finish(out);
}
@@ -129,8 +129,8 @@ template <typename ArrowType>
typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NullableArray(
size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
std::vector<typename ArrowType::c_type> values;
- ::arrow::test::random_real<typename ArrowType::c_type>(
- size, seed, -1e10, 1e10, &values);
+ ::arrow::test::random_real<typename ArrowType::c_type>(size, seed, -1e10, 1e10,
+ &values);
std::vector<uint8_t> valid_bytes(size, 1);
for (size_t i = 0; i < num_nulls; i++) {
@@ -159,8 +159,8 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Arra
}
// Passing data type so this will work with TimestampType too
- ::arrow::NumericBuilder<ArrowType> builder(
- ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+ ::arrow::NumericBuilder<ArrowType> builder(::arrow::default_memory_pool(),
+ std::make_shared<ArrowType>());
RETURN_NOT_OK(builder.Append(values.data(), values.size(), valid_bytes.data()));
return builder.Finish(out);
}
@@ -183,8 +183,8 @@ typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NullableA
}
// Passing data type so this will work with TimestampType too
- ::arrow::NumericBuilder<ArrowType> builder(
- ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+ ::arrow::NumericBuilder<ArrowType> builder(::arrow::default_memory_pool(),
+ std::make_shared<ArrowType>());
builder.Append(values.data(), values.size(), valid_bytes.data());
return builder.Finish(out);
}
@@ -193,8 +193,8 @@ typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NullableA
template <typename ArrowType>
typename std::enable_if<
is_arrow_string<ArrowType>::value || is_arrow_binary<ArrowType>::value, Status>::type
-NullableArray(
- size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) {
+NullableArray(size_t size, size_t num_nulls, uint32_t seed,
+ std::shared_ptr<::arrow::Array>* out) {
std::vector<uint8_t> valid_bytes(size, 1);
for (size_t i = 0; i < num_nulls; i++) {
@@ -221,8 +221,8 @@ NullableArray(
// same as NullableArray<String|Binary>(..)
template <typename ArrowType>
typename std::enable_if<is_arrow_fixed_size_binary<ArrowType>::value, Status>::type
-NullableArray(
- size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) {
+NullableArray(size_t size, size_t num_nulls, uint32_t seed,
+ std::shared_ptr<::arrow::Array>* out) {
std::vector<uint8_t> valid_bytes(size, 1);
for (size_t i = 0; i < num_nulls; i++) {
@@ -231,8 +231,8 @@ NullableArray(
using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
const int byte_width = 10;
- BuilderType builder(
- ::arrow::default_memory_pool(), ::arrow::fixed_size_binary(byte_width));
+ BuilderType builder(::arrow::default_memory_pool(),
+ ::arrow::fixed_size_binary(byte_width));
const int kBufferSize = byte_width;
uint8_t buffer[kBufferSize];
@@ -272,7 +272,8 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableA
///
/// This helper function only supports (size/2) nulls.
Status MakeListArray(const std::shared_ptr<Array>& values, int64_t size,
- int64_t null_count, bool nullable_values, std::shared_ptr<::arrow::ListArray>* out) {
+ int64_t null_count, bool nullable_values,
+ std::shared_ptr<::arrow::ListArray>* out) {
// We always include an empty list
int64_t non_null_entries = size - null_count - 1;
int64_t length_per_entry = values->length() / non_null_entries;
@@ -294,33 +295,37 @@ Status MakeListArray(const std::shared_ptr<Array>& values, int64_t size,
if (!(((i % 2) == 0) && ((i / 2) < null_count))) {
// Non-null list (list with index 1 is always empty).
::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
- if (i != 1) { current_offset += static_cast<int32_t>(length_per_entry); }
+ if (i != 1) {
+ current_offset += static_cast<int32_t>(length_per_entry);
+ }
}
}
offsets_ptr[size] = static_cast<int32_t>(values->length());
auto value_field =
std::make_shared<::arrow::Field>("item", values->type(), nullable_values);
- *out = std::make_shared<::arrow::ListArray>(
- ::arrow::list(value_field), size, offsets, values, null_bitmap, null_count);
+ *out = std::make_shared<::arrow::ListArray>(::arrow::list(value_field), size, offsets,
+ values, null_bitmap, null_count);
return Status::OK();
}
-static std::shared_ptr<::arrow::Column> MakeColumn(
- const std::string& name, const std::shared_ptr<Array>& array, bool nullable) {
+static std::shared_ptr<::arrow::Column> MakeColumn(const std::string& name,
+ const std::shared_ptr<Array>& array,
+ bool nullable) {
auto field = std::make_shared<::arrow::Field>(name, array->type(), nullable);
return std::make_shared<::arrow::Column>(field, array);
}
-static std::shared_ptr<::arrow::Column> MakeColumn(const std::string& name,
- const std::vector<std::shared_ptr<Array>>& arrays, bool nullable) {
+static std::shared_ptr<::arrow::Column> MakeColumn(
+ const std::string& name, const std::vector<std::shared_ptr<Array>>& arrays,
+ bool nullable) {
auto field = std::make_shared<::arrow::Field>(name, arrays[0]->type(), nullable);
return std::make_shared<::arrow::Column>(field, arrays);
}
-std::shared_ptr<::arrow::Table> MakeSimpleTable(
- const std::shared_ptr<Array>& values, bool nullable) {
+std::shared_ptr<::arrow::Table> MakeSimpleTable(const std::shared_ptr<Array>& values,
+ bool nullable) {
std::shared_ptr<::arrow::Column> column = MakeColumn("col", values, nullable);
std::vector<std::shared_ptr<::arrow::Column>> columns({column});
std::vector<std::shared_ptr<::arrow::Field>> fields({column->field()});
@@ -341,15 +346,15 @@ void ExpectArrayT(void* expected, Array* result) {
::arrow::PrimitiveArray* p_array = static_cast<::arrow::PrimitiveArray*>(result);
for (int64_t i = 0; i < result->length(); i++) {
EXPECT_EQ(reinterpret_cast<typename ArrowType::c_type*>(expected)[i],
- reinterpret_cast<const typename ArrowType::c_type*>(
- p_array->values()->data())[i]);
+ reinterpret_cast<const typename ArrowType::c_type*>(
+ p_array->values()->data())[i]);
}
}
template <>
void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) {
- ::arrow::BooleanBuilder builder(
- ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>());
+ ::arrow::BooleanBuilder builder(::arrow::default_memory_pool(),
+ std::make_shared<::arrow::BooleanType>());
EXPECT_OK(builder.Append(reinterpret_cast<uint8_t*>(expected), result->length()));
std::shared_ptr<Array> expected_array;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index c562b27..41c1146 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -104,9 +104,11 @@ class LevelBuilder {
NOT_IMPLEMENTED_VISIT(Interval)
Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field,
- int64_t* values_offset, ::arrow::Type::type* values_type, int64_t* num_values,
- int64_t* num_levels, std::shared_ptr<Buffer>* def_levels,
- std::shared_ptr<Buffer>* rep_levels, std::shared_ptr<Array>* values_array) {
+ int64_t* values_offset, ::arrow::Type::type* values_type,
+ int64_t* num_values, int64_t* num_levels,
+ std::shared_ptr<Buffer>* def_levels,
+ std::shared_ptr<Buffer>* rep_levels,
+ std::shared_ptr<Array>* values_array) {
// Work downwards to extract bitmaps and offsets
min_offset_idx_ = 0;
max_offset_idx_ = static_cast<int32_t>(array.length());
@@ -192,17 +194,21 @@ class LevelBuilder {
int32_t inner_offset = offsets_[rep_level][index];
int32_t inner_length = offsets_[rep_level][index + 1] - inner_offset;
int64_t recursion_level = rep_level + 1;
- if (inner_length == 0) { return def_levels_.Append(def_level); }
+ if (inner_length == 0) {
+ return def_levels_.Append(def_level);
+ }
if (recursion_level < static_cast<int64_t>(offsets_.size())) {
return HandleListEntries(def_level + 1, rep_level + 1, inner_offset, inner_length);
} else {
// We have reached the leaf: primitive list, handle remaining nullables
for (int64_t i = 0; i < inner_length; i++) {
- if (i > 0) { RETURN_NOT_OK(rep_levels_.Append(rep_level + 1)); }
+ if (i > 0) {
+ RETURN_NOT_OK(rep_levels_.Append(rep_level + 1));
+ }
if (nullable_[recursion_level] &&
((null_counts_[recursion_level] == 0) ||
- BitUtil::GetBit(valid_bitmaps_[recursion_level],
- inner_offset + i + array_offsets_[recursion_level]))) {
+ BitUtil::GetBit(valid_bitmaps_[recursion_level],
+ inner_offset + i + array_offsets_[recursion_level]))) {
RETURN_NOT_OK(def_levels_.Append(def_level + 2));
} else {
// This can be produced in two case:
@@ -216,10 +222,12 @@ class LevelBuilder {
}
}
- Status HandleListEntries(
- int16_t def_level, int16_t rep_level, int64_t offset, int64_t length) {
+ Status HandleListEntries(int16_t def_level, int16_t rep_level, int64_t offset,
+ int64_t length) {
for (int64_t i = 0; i < length; i++) {
- if (i > 0) { RETURN_NOT_OK(rep_levels_.Append(rep_level)); }
+ if (i > 0) {
+ RETURN_NOT_OK(rep_levels_.Append(rep_level));
+ }
RETURN_NOT_OK(HandleList(def_level, rep_level, offset + i));
}
return Status::OK();
@@ -249,28 +257,32 @@ Status LevelBuilder::VisitInline(const Array& array) {
class FileWriter::Impl {
public:
Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties);
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties);
Status NewRowGroup(int64_t chunk_size);
template <typename ParquetType, typename ArrowType>
Status TypedWriteBatch(ColumnWriter* writer, const std::shared_ptr<Array>& data,
- int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels);
+ int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels);
Status TypedWriteBatchConvertedNanos(ColumnWriter* writer,
- const std::shared_ptr<Array>& data, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels);
+ const std::shared_ptr<Array>& data,
+ int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels);
template <typename ParquetType, typename ArrowType>
Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer,
- const ArrowType& type, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels,
- const typename ArrowType::c_type* data_ptr);
+ const ArrowType& type, int64_t num_values,
+ int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels,
+ const typename ArrowType::c_type* data_ptr);
template <typename ParquetType, typename ArrowType>
Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, const ArrowType& type,
- int64_t num_values, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
- const typename ArrowType::c_type* data_ptr);
+ int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels,
+ const uint8_t* valid_bits, int64_t valid_bits_offset,
+ const typename ArrowType::c_type* data_ptr);
Status WriteColumnChunk(const Array& data);
Status Close();
@@ -290,7 +302,7 @@ class FileWriter::Impl {
};
FileWriter::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
: pool_(pool),
data_buffer_(pool),
writer_(std::move(writer)),
@@ -298,15 +310,18 @@ FileWriter::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writ
arrow_properties_(arrow_properties) {}
Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
- if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
+ if (row_group_writer_ != nullptr) {
+ PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+ }
PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup(chunk_size));
return Status::OK();
}
template <typename ParquetType, typename ArrowType>
Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
- const std::shared_ptr<Array>& array, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels) {
+ const std::shared_ptr<Array>& array,
+ int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels) {
using ArrowCType = typename ArrowType::c_type;
auto data = static_cast<const PrimitiveArray*>(array.get());
@@ -315,13 +330,13 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
// no nulls, just dump the data
- RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(writer,
- static_cast<const ArrowType&>(*array->type()), array->length(), num_levels,
- def_levels, rep_levels, data_ptr + data->offset())));
+ RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(
+ writer, static_cast<const ArrowType&>(*array->type()), array->length(),
+ num_levels, def_levels, rep_levels, data_ptr + data->offset())));
} else {
const uint8_t* valid_bits = data->null_bitmap_data();
- RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer,
- static_cast<const ArrowType&>(*array->type()), data->length(), num_levels,
+ RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(
+ writer, static_cast<const ArrowType&>(*array->type()), data->length(), num_levels,
def_levels, rep_levels, valid_bits, data->offset(), data_ptr + data->offset())));
}
PARQUET_CATCH_NOT_OK(writer->Close());
@@ -329,9 +344,9 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
}
template <typename ParquetType, typename ArrowType>
-Status FileWriter::Impl::WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer,
- const ArrowType& type, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels,
+Status FileWriter::Impl::WriteNonNullableBatch(
+ TypedColumnWriter<ParquetType>* writer, const ArrowType& type, int64_t num_values,
+ int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
const typename ArrowType::c_type* data_ptr) {
using ParquetCType = typename ParquetType::c_type;
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
@@ -416,9 +431,12 @@ NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
template <typename ParquetType, typename ArrowType>
Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writer,
- const ArrowType& type, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr) {
+ const ArrowType& type, int64_t num_values,
+ int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels,
+ const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ const typename ArrowType::c_type* data_ptr) {
using ParquetCType = typename ParquetType::c_type;
RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
@@ -535,9 +553,9 @@ NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
-Status FileWriter::Impl::TypedWriteBatchConvertedNanos(ColumnWriter* column_writer,
- const std::shared_ptr<Array>& array, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels) {
+Status FileWriter::Impl::TypedWriteBatchConvertedNanos(
+ ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels) {
// Note that we can only use data_buffer_ here as we write timestamps with the fast
// path.
RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(int64_t)));
@@ -557,13 +575,14 @@ Status FileWriter::Impl::TypedWriteBatchConvertedNanos(ColumnWriter* column_writ
::arrow::timestamp(::arrow::TimeUnit::MICRO));
if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
// no nulls, just dump the data
- RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(writer, *type,
- array->length(), num_levels, def_levels, rep_levels, data_buffer_ptr)));
+ RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(
+ writer, *type, array->length(), num_levels, def_levels, rep_levels,
+ data_buffer_ptr)));
} else {
const uint8_t* valid_bits = data->null_bitmap_data();
- RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(writer, *type,
- array->length(), num_levels, def_levels, rep_levels, valid_bits, data->offset(),
- data_buffer_ptr)));
+ RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(
+ writer, *type, array->length(), num_levels, def_levels, rep_levels, valid_bits,
+ data->offset(), data_buffer_ptr)));
}
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
@@ -681,7 +700,9 @@ Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>
}
Status FileWriter::Impl::Close() {
- if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
+ if (row_group_writer_ != nullptr) {
+ PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+ }
PARQUET_CATCH_NOT_OK(writer_->Close());
return Status::OK();
}
@@ -697,7 +718,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
int current_column_idx = row_group_writer_->current_column();
std::shared_ptr<::arrow::Schema> arrow_schema;
RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1},
- writer_->key_value_metadata(), &arrow_schema));
+ writer_->key_value_metadata(), &arrow_schema));
std::shared_ptr<Buffer> def_levels_buffer;
std::shared_ptr<Buffer> rep_levels_buffer;
int64_t values_offset;
@@ -707,9 +728,9 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
std::shared_ptr<Array> _values_array;
LevelBuilder level_builder(pool_);
- RETURN_NOT_OK(level_builder.GenerateLevels(data, arrow_schema->field(0), &values_offset,
- &values_type, &num_values, &num_levels, &def_levels_buffer, &rep_levels_buffer,
- &_values_array));
+ RETURN_NOT_OK(level_builder.GenerateLevels(
+ data, arrow_schema->field(0), &values_offset, &values_type, &num_values,
+ &num_levels, &def_levels_buffer, &rep_levels_buffer, &_values_array));
const int16_t* def_levels = nullptr;
if (def_levels_buffer) {
def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
@@ -747,8 +768,8 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(
column_writer, values_array, num_levels, def_levels, rep_levels);
} else if (timestamp_type->unit() == ::arrow::TimeUnit::NANO) {
- return TypedWriteBatchConvertedNanos(
- column_writer, values_array, num_levels, def_levels, rep_levels);
+ return TypedWriteBatchConvertedNanos(column_writer, values_array, num_levels,
+ def_levels, rep_levels);
} else {
return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(
column_writer, values_array, num_levels, def_levels, rep_levels);
@@ -786,35 +807,31 @@ Status FileWriter::WriteColumnChunk(const ::arrow::Array& array) {
return impl_->WriteColumnChunk(array);
}
-Status FileWriter::Close() {
- return impl_->Close();
-}
+Status FileWriter::Close() { return impl_->Close(); }
-MemoryPool* FileWriter::memory_pool() const {
- return impl_->pool_;
-}
+MemoryPool* FileWriter::memory_pool() const { return impl_->pool_; }
FileWriter::~FileWriter() {}
FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
: impl_(new FileWriter::Impl(pool, std::move(writer), arrow_properties)) {}
Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
- const std::shared_ptr<OutputStream>& sink,
- const std::shared_ptr<WriterProperties>& properties,
- std::unique_ptr<FileWriter>* writer) {
+ const std::shared_ptr<OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ std::unique_ptr<FileWriter>* writer) {
return Open(schema, pool, sink, properties, default_arrow_writer_properties(), writer);
}
Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
- const std::shared_ptr<OutputStream>& sink,
- const std::shared_ptr<WriterProperties>& properties,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
- std::unique_ptr<FileWriter>* writer) {
+ const std::shared_ptr<OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+ std::unique_ptr<FileWriter>* writer) {
std::shared_ptr<SchemaDescriptor> parquet_schema;
RETURN_NOT_OK(ToParquetSchema(&schema, *properties, &parquet_schema,
- arrow_properties->support_deprecated_int96_timestamps()));
+ arrow_properties->support_deprecated_int96_timestamps()));
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
@@ -826,18 +843,18 @@ Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool
}
Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
- const std::shared_ptr<::arrow::io::OutputStream>& sink,
- const std::shared_ptr<WriterProperties>& properties,
- std::unique_ptr<FileWriter>* writer) {
+ const std::shared_ptr<::arrow::io::OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ std::unique_ptr<FileWriter>* writer) {
auto wrapper = std::make_shared<ArrowOutputStream>(sink);
return Open(schema, pool, wrapper, properties, writer);
}
Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
- const std::shared_ptr<::arrow::io::OutputStream>& sink,
- const std::shared_ptr<WriterProperties>& properties,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
- std::unique_ptr<FileWriter>* writer) {
+ const std::shared_ptr<::arrow::io::OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+ std::unique_ptr<FileWriter>* writer) {
auto wrapper = std::make_shared<ArrowOutputStream>(sink);
return Open(schema, pool, wrapper, properties, arrow_properties, writer);
}
@@ -868,20 +885,20 @@ Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
}
Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
- const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
- const std::shared_ptr<WriterProperties>& properties,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
+ const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
+ const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
std::unique_ptr<FileWriter> writer;
- RETURN_NOT_OK(FileWriter::Open(
- *table.schema(), pool, sink, properties, arrow_properties, &writer));
+ RETURN_NOT_OK(FileWriter::Open(*table.schema(), pool, sink, properties,
+ arrow_properties, &writer));
RETURN_NOT_OK(writer->WriteTable(table, chunk_size));
return writer->Close();
}
Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
- const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
- const std::shared_ptr<WriterProperties>& properties,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
+ const std::shared_ptr<::arrow::io::OutputStream>& sink,
+ int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
auto wrapper = std::make_shared<ArrowOutputStream>(sink);
return WriteTable(table, pool, wrapper, chunk_size, properties, arrow_properties);
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/arrow/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
index 4f7d2b4..a74f263 100644
--- a/src/parquet/arrow/writer.h
+++ b/src/parquet/arrow/writer.h
@@ -85,26 +85,28 @@ std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_prope
class PARQUET_EXPORT FileWriter {
public:
FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
- default_arrow_writer_properties());
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+ default_arrow_writer_properties());
static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
- const std::shared_ptr<OutputStream>& sink,
- const std::shared_ptr<WriterProperties>& properties,
- std::unique_ptr<FileWriter>* writer);
+ const std::shared_ptr<OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ std::unique_ptr<FileWriter>* writer);
- static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+ static ::arrow::Status Open(
+ const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
const std::shared_ptr<OutputStream>& sink,
const std::shared_ptr<WriterProperties>& properties,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
std::unique_ptr<FileWriter>* writer);
static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
- const std::shared_ptr<::arrow::io::OutputStream>& sink,
- const std::shared_ptr<WriterProperties>& properties,
- std::unique_ptr<FileWriter>* writer);
+ const std::shared_ptr<::arrow::io::OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ std::unique_ptr<FileWriter>* writer);
- static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+ static ::arrow::Status Open(
+ const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink,
const std::shared_ptr<WriterProperties>& properties,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
@@ -135,16 +137,16 @@ class PARQUET_EXPORT FileWriter {
*
* The table shall only consist of columns of primitive type or of primitive lists.
*/
-::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table& table,
- ::arrow::MemoryPool* pool, const std::shared_ptr<OutputStream>& sink,
- int64_t chunk_size,
+::arrow::Status PARQUET_EXPORT WriteTable(
+ const ::arrow::Table& table, ::arrow::MemoryPool* pool,
+ const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties());
-::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table& table,
- ::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink,
- int64_t chunk_size,
+::arrow::Status PARQUET_EXPORT WriteTable(
+ const ::arrow::Table& table, ::arrow::MemoryPool* pool,
+ const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties());
@@ -160,8 +162,8 @@ constexpr int64_t kNanosecondsPerDay = INT64_C(86400000000000);
/**
* Converts nanosecond timestamps to Impala (Int96) format
*/
-inline void NanosecondsToImpalaTimestamp(
- const int64_t nanoseconds, Int96* impala_timestamp) {
+inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds,
+ Int96* impala_timestamp) {
int64_t julian_days = (nanoseconds / kNanosecondsPerDay) + kJulianEpochOffsetDays;
(*impala_timestamp).value[2] = (uint32_t)julian_days;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b6f3caeb/src/parquet/column-io-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column-io-benchmark.cc b/src/parquet/column-io-benchmark.cc
index 0a60367..2abf6fa 100644
--- a/src/parquet/column-io-benchmark.cc
+++ b/src/parquet/column-io-benchmark.cc
@@ -30,8 +30,9 @@ using schema::PrimitiveNode;
namespace benchmark {
std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
- ColumnChunkMetaDataBuilder* metadata, ColumnDescriptor* schema,
- const WriterProperties* properties) {
+ ColumnChunkMetaDataBuilder* metadata,
+ ColumnDescriptor* schema,
+ const WriterProperties* properties) {
std::unique_ptr<SerializedPageWriter> pager(
new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
return std::unique_ptr<Int64Writer>(new Int64Writer(
@@ -40,8 +41,8 @@ std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
auto node = PrimitiveNode::Make("int64", repetition, Type::INT64);
- return std::make_shared<ColumnDescriptor>(
- node, repetition != Repetition::REQUIRED, repetition == Repetition::REPEATED);
+ return std::make_shared<ColumnDescriptor>(node, repetition != Repetition::REQUIRED,
+ repetition == Repetition::REPEATED);
}
void SetBytesProcessed(::benchmark::State& state, Repetition::type repetition) {
@@ -70,8 +71,8 @@ static void BM_WriteInt64Column(::benchmark::State& state) {
InMemoryOutputStream stream;
std::unique_ptr<Int64Writer> writer = BuildWriter(
state.range(0), &stream, metadata.get(), schema.get(), properties.get());
- writer->WriteBatch(
- values.size(), definition_levels.data(), repetition_levels.data(), values.data());
+ writer->WriteBatch(values.size(), definition_levels.data(), repetition_levels.data(),
+ values.data());
writer->Close();
}
SetBytesProcessed(state, repetition);
@@ -83,8 +84,8 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL)->Range(1024, 65536
BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED)->Range(1024, 65536);
-std::unique_ptr<Int64Reader> BuildReader(
- std::shared_ptr<Buffer>& buffer, int64_t num_values, ColumnDescriptor* schema) {
+std::unique_ptr<Int64Reader> BuildReader(std::shared_ptr<Buffer>& buffer,
+ int64_t num_values, ColumnDescriptor* schema) {
std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
std::unique_ptr<SerializedPageReader> page_reader(
new SerializedPageReader(std::move(source), num_values, Compression::UNCOMPRESSED));
@@ -105,8 +106,8 @@ static void BM_ReadInt64Column(::benchmark::State& state) {
InMemoryOutputStream stream;
std::unique_ptr<Int64Writer> writer = BuildWriter(
state.range(0), &stream, metadata.get(), schema.get(), properties.get());
- writer->WriteBatch(
- values.size(), definition_levels.data(), repetition_levels.data(), values.data());
+ writer->WriteBatch(values.size(), definition_levels.data(), repetition_levels.data(),
+ values.data());
writer->Close();
std::shared_ptr<Buffer> src = stream.GetBuffer();
@@ -118,7 +119,7 @@ static void BM_ReadInt64Column(::benchmark::State& state) {
int64_t values_read = 0;
for (size_t i = 0; i < values.size(); i += values_read) {
reader->ReadBatch(values_out.size(), definition_levels_out.data(),
- repetition_levels_out.data(), values_out.data(), &values_read);
+ repetition_levels_out.data(), values_out.data(), &values_read);
}
}
SetBytesProcessed(state, repetition);
@@ -136,8 +137,8 @@ BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED)
static void BM_RleEncoding(::benchmark::State& state) {
std::vector<int16_t> levels(state.range(0), 0);
int64_t n = 0;
- std::generate(
- levels.begin(), levels.end(), [&state, &n] { return (n++ % state.range(1)) == 0; });
+ std::generate(levels.begin(), levels.end(),
+ [&state, &n] { return (n++ % state.range(1)) == 0; });
int16_t max_level = 1;
int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, levels.size());
auto buffer_rle = std::make_shared<PoolBuffer>();
@@ -146,7 +147,7 @@ static void BM_RleEncoding(::benchmark::State& state) {
while (state.KeepRunning()) {
LevelEncoder level_encoder;
level_encoder.Init(Encoding::RLE, max_level, levels.size(),
- buffer_rle->mutable_data(), buffer_rle->size());
+ buffer_rle->mutable_data(), buffer_rle->size());
level_encoder.Encode(levels.size(), levels.data());
}
state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
@@ -159,14 +160,14 @@ static void BM_RleDecoding(::benchmark::State& state) {
LevelEncoder level_encoder;
std::vector<int16_t> levels(state.range(0), 0);
int64_t n = 0;
- std::generate(
- levels.begin(), levels.end(), [&state, &n] { return (n++ % state.range(1)) == 0; });
+ std::generate(levels.begin(), levels.end(),
+ [&state, &n] { return (n++ % state.range(1)) == 0; });
int16_t max_level = 1;
int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, levels.size());
auto buffer_rle = std::make_shared<PoolBuffer>();
PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size + sizeof(int32_t)));
level_encoder.Init(Encoding::RLE, max_level, levels.size(),
- buffer_rle->mutable_data() + sizeof(int32_t), rle_size);
+ buffer_rle->mutable_data() + sizeof(int32_t), rle_size);
level_encoder.Encode(levels.size(), levels.data());
reinterpret_cast<int32_t*>(buffer_rle->mutable_data())[0] = level_encoder.len();