You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2018/08/25 11:33:41 UTC
[parquet-cpp] branch master updated: PARQUET-1372: Add an API to
allow writing RowGroups based on size
This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 80e110c PARQUET-1372: Add an API to allow writing RowGroups based on size
80e110c is described below
commit 80e110c823c5631ce4a4f0a5da486e759219f1e3
Author: Deepak Majeti <de...@hpe.com>
AuthorDate: Sat Aug 25 13:33:35 2018 +0200
PARQUET-1372: Add an API to allow writing RowGroups based on size
I split the changes into multiple commits to ease the review.
Used the example program to test the new API.
I will add unit tests once we converge on the API after review.
Thanks to @anatolishein for collaborating with the API design.
Author: Deepak Majeti <de...@hpe.com>
Closes #484 from majetideepak/PARQUET-1372 and squashes the following commits:
143ed51 [Deepak Majeti] improve comments
c10fe08 [Deepak Majeti] Add test
d12b10b [Deepak Majeti] Review comments
cb99b3f [Deepak Majeti] fix compiler warnings
e179a4c [Deepak Majeti] add example header
710bbe0 [Deepak Majeti] clang format
9e03004 [Deepak Majeti] reorg examples
410a3af [Deepak Majeti] remove flush_on_close
e148817 [Deepak Majeti] add BufferedPageWriter
26a52c1 [Deepak Majeti] clang format
20049c0 [Deepak Majeti] modify examples
9db26a2 [Deepak Majeti] Combine RowGroupWriter2 with RowGroupWriter
cb7d69c [Deepak Majeti] fix compiler errors
21642b3 [Deepak Majeti] clang format
530b835 [Deepak Majeti] example for RowGroupWriter2
0fc1f5c [Deepak Majeti] Extend Column Writer to flush pages on Close
f2f420d [Deepak Majeti] RowGroupWriter2, implementation that writes all columns at once
---
CMakeLists.txt | 4 +-
examples/low-level-api/CMakeLists.txt | 4 +
examples/low-level-api/reader-writer.cc | 60 +---
.../{reader-writer.cc => reader-writer2.cc} | 313 ++++++++++-----------
examples/low-level-api/reader_writer.h | 71 +++++
src/parquet/arrow/test-util.h | 7 +-
src/parquet/column_writer-test.cc | 1 -
src/parquet/column_writer.cc | 74 ++++-
src/parquet/column_writer.h | 17 +-
src/parquet/file-serialize-test.cc | 68 ++++-
src/parquet/file_writer.cc | 153 ++++++++--
src/parquet/file_writer.h | 33 ++-
12 files changed, 531 insertions(+), 274 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5b3c460..698f6d7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -458,7 +458,7 @@ add_custom_target(format-example
${BUILD_SUPPORT_DIR}/run_clang_format.py
${CLANG_FORMAT_BIN}
${BUILD_SUPPORT_DIR}/clang_format_exclusions.txt
- ${CMAKE_CURRENT_SOURCE_DIR}/examples/parquet-arrow)
+ ${CMAKE_CURRENT_SOURCE_DIR}/examples)
add_custom_target(format
DEPENDS format-example
@@ -474,7 +474,7 @@ add_custom_target(format
add_custom_target(check-format-examples ${BUILD_SUPPORT_DIR}/run_clang_format.py
${CLANG_FORMAT_BIN}
${BUILD_SUPPORT_DIR}/clang_format_exclusions.txt
- ${CMAKE_CURRENT_SOURCE_DIR}/examples/parquet-arrow 1)
+ ${CMAKE_CURRENT_SOURCE_DIR}/examples 1)
add_custom_target(check-format
DEPENDS check-format-examples
COMMAND
diff --git a/examples/low-level-api/CMakeLists.txt b/examples/low-level-api/CMakeLists.txt
index 721fa9a..64ba110 100644
--- a/examples/low-level-api/CMakeLists.txt
+++ b/examples/low-level-api/CMakeLists.txt
@@ -17,5 +17,9 @@
if (PARQUET_BUILD_EXECUTABLES)
add_executable(reader-writer reader-writer.cc)
+ add_executable(reader-writer2 reader-writer2.cc)
+ target_include_directories(reader-writer PRIVATE .)
+ target_include_directories(reader-writer2 PRIVATE .)
target_link_libraries(reader-writer parquet_static)
+ target_link_libraries(reader-writer2 parquet_static)
endif()
diff --git a/examples/low-level-api/reader-writer.cc b/examples/low-level-api/reader-writer.cc
index fb2ec77..09cd137 100644
--- a/examples/low-level-api/reader-writer.cc
+++ b/examples/low-level-api/reader-writer.cc
@@ -18,19 +18,16 @@
#include <cassert>
#include <fstream>
#include <iostream>
-#include <list>
#include <memory>
-#include <arrow/io/file.h>
-#include <arrow/util/logging.h>
-
-#include <parquet/api/reader.h>
-#include <parquet/api/writer.h>
+#include <reader_writer.h>
/*
* This example describes writing and reading Parquet Files in C++ and serves as a
* reference to the API.
* The file contains all the physical data types supported by Parquet.
+ * This example uses the RowGroupWriter API that supports writing RowGroups optimized for
+ *memory consumption
**/
/* Parquet is a structured columnar file format
@@ -46,56 +43,8 @@
**/
constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
-constexpr int FIXED_LENGTH = 10;
const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet";
-using parquet::Repetition;
-using parquet::Type;
-using parquet::LogicalType;
-using parquet::schema::PrimitiveNode;
-using parquet::schema::GroupNode;
-
-static std::shared_ptr<GroupNode> SetupSchema() {
- parquet::schema::NodeVector fields;
- // Create a primitive node named 'boolean_field' with type:BOOLEAN,
- // repetition:REQUIRED
- fields.push_back(PrimitiveNode::Make("boolean_field", Repetition::REQUIRED,
- Type::BOOLEAN, LogicalType::NONE));
-
- // Create a primitive node named 'int32_field' with type:INT32, repetition:REQUIRED,
- // logical type:TIME_MILLIS
- fields.push_back(PrimitiveNode::Make("int32_field", Repetition::REQUIRED, Type::INT32,
- LogicalType::TIME_MILLIS));
-
- // Create a primitive node named 'int64_field' with type:INT64, repetition:REPEATED
- fields.push_back(PrimitiveNode::Make("int64_field", Repetition::REPEATED, Type::INT64,
- LogicalType::NONE));
-
- fields.push_back(PrimitiveNode::Make("int96_field", Repetition::REQUIRED, Type::INT96,
- LogicalType::NONE));
-
- fields.push_back(PrimitiveNode::Make("float_field", Repetition::REQUIRED, Type::FLOAT,
- LogicalType::NONE));
-
- fields.push_back(PrimitiveNode::Make("double_field", Repetition::REQUIRED, Type::DOUBLE,
- LogicalType::NONE));
-
- // Create a primitive node named 'ba_field' with type:BYTE_ARRAY, repetition:OPTIONAL
- fields.push_back(PrimitiveNode::Make("ba_field", Repetition::OPTIONAL, Type::BYTE_ARRAY,
- LogicalType::NONE));
-
- // Create a primitive node named 'flba_field' with type:FIXED_LEN_BYTE_ARRAY,
- // repetition:REQUIRED, field_length = FIXED_LENGTH
- fields.push_back(PrimitiveNode::Make("flba_field", Repetition::REQUIRED,
- Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE,
- FIXED_LENGTH));
-
- // Create a GroupNode named 'schema' using the primitive nodes defined above
- // This GroupNode is the root node of the schema tree
- return std::static_pointer_cast<GroupNode>(
- GroupNode::Make("schema", Repetition::REQUIRED, fields));
-}
-
int main(int argc, char** argv) {
/**********************************************************************************
PARQUET WRITER EXAMPLE
@@ -122,8 +71,7 @@ int main(int argc, char** argv) {
parquet::ParquetFileWriter::Open(out_file, schema, props);
// Append a RowGroup with a specific number of rows.
- parquet::RowGroupWriter* rg_writer =
- file_writer->AppendRowGroup(NUM_ROWS_PER_ROW_GROUP);
+ parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
// Write the Bool column
parquet::BoolWriter* bool_writer =
diff --git a/examples/low-level-api/reader-writer.cc b/examples/low-level-api/reader-writer2.cc
similarity index 62%
copy from examples/low-level-api/reader-writer.cc
copy to examples/low-level-api/reader-writer2.cc
index fb2ec77..dded5fa 100644
--- a/examples/low-level-api/reader-writer.cc
+++ b/examples/low-level-api/reader-writer2.cc
@@ -18,19 +18,16 @@
#include <cassert>
#include <fstream>
#include <iostream>
-#include <list>
#include <memory>
-#include <arrow/io/file.h>
-#include <arrow/util/logging.h>
-
-#include <parquet/api/reader.h>
-#include <parquet/api/writer.h>
+#include <reader_writer.h>
/*
* This example describes writing and reading Parquet Files in C++ and serves as a
* reference to the API.
* The file contains all the physical data types supported by Parquet.
+ * This example uses the RowGroupWriter API that supports writing RowGroups based on a
+ *certain size
**/
/* Parquet is a structured columnar file format
@@ -45,56 +42,9 @@
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
**/
-constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
-constexpr int FIXED_LENGTH = 10;
-const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet";
-
-using parquet::Repetition;
-using parquet::Type;
-using parquet::LogicalType;
-using parquet::schema::PrimitiveNode;
-using parquet::schema::GroupNode;
-
-static std::shared_ptr<GroupNode> SetupSchema() {
- parquet::schema::NodeVector fields;
- // Create a primitive node named 'boolean_field' with type:BOOLEAN,
- // repetition:REQUIRED
- fields.push_back(PrimitiveNode::Make("boolean_field", Repetition::REQUIRED,
- Type::BOOLEAN, LogicalType::NONE));
-
- // Create a primitive node named 'int32_field' with type:INT32, repetition:REQUIRED,
- // logical type:TIME_MILLIS
- fields.push_back(PrimitiveNode::Make("int32_field", Repetition::REQUIRED, Type::INT32,
- LogicalType::TIME_MILLIS));
-
- // Create a primitive node named 'int64_field' with type:INT64, repetition:REPEATED
- fields.push_back(PrimitiveNode::Make("int64_field", Repetition::REPEATED, Type::INT64,
- LogicalType::NONE));
-
- fields.push_back(PrimitiveNode::Make("int96_field", Repetition::REQUIRED, Type::INT96,
- LogicalType::NONE));
-
- fields.push_back(PrimitiveNode::Make("float_field", Repetition::REQUIRED, Type::FLOAT,
- LogicalType::NONE));
-
- fields.push_back(PrimitiveNode::Make("double_field", Repetition::REQUIRED, Type::DOUBLE,
- LogicalType::NONE));
-
- // Create a primitive node named 'ba_field' with type:BYTE_ARRAY, repetition:OPTIONAL
- fields.push_back(PrimitiveNode::Make("ba_field", Repetition::OPTIONAL, Type::BYTE_ARRAY,
- LogicalType::NONE));
-
- // Create a primitive node named 'flba_field' with type:FIXED_LEN_BYTE_ARRAY,
- // repetition:REQUIRED, field_length = FIXED_LENGTH
- fields.push_back(PrimitiveNode::Make("flba_field", Repetition::REQUIRED,
- Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE,
- FIXED_LENGTH));
-
- // Create a GroupNode named 'schema' using the primitive nodes defined above
- // This GroupNode is the root node of the schema tree
- return std::static_pointer_cast<GroupNode>(
- GroupNode::Make("schema", Repetition::REQUIRED, fields));
-}
+constexpr int NUM_ROWS = 2500000;
+constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024; // 16 MB
+const char PARQUET_FILENAME[] = "parquet_cpp_example2.parquet";
int main(int argc, char** argv) {
/**********************************************************************************
@@ -121,99 +71,118 @@ int main(int argc, char** argv) {
std::shared_ptr<parquet::ParquetFileWriter> file_writer =
parquet::ParquetFileWriter::Open(out_file, schema, props);
- // Append a RowGroup with a specific number of rows.
- parquet::RowGroupWriter* rg_writer =
- file_writer->AppendRowGroup(NUM_ROWS_PER_ROW_GROUP);
+ // Append a BufferedRowGroup to keep the RowGroup open until a certain size
+ parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup();
- // Write the Bool column
- parquet::BoolWriter* bool_writer =
- static_cast<parquet::BoolWriter*>(rg_writer->NextColumn());
- for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
- bool value = ((i % 2) == 0) ? true : false;
- bool_writer->WriteBatch(1, nullptr, nullptr, &value);
- }
+ int num_columns = file_writer->num_columns();
+ std::vector<int64_t> buffered_values_estimate(num_columns, 0);
+ for (int i = 0; i < NUM_ROWS; i++) {
+ int64_t estimated_bytes = 0;
+ // Get the estimated size of the values that are not written to a page yet
+ for (int n = 0; n < num_columns; n++) {
+ estimated_bytes += buffered_values_estimate[n];
+ }
- // Write the Int32 column
- parquet::Int32Writer* int32_writer =
- static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
- for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
- int32_t value = i;
- int32_writer->WriteBatch(1, nullptr, nullptr, &value);
- }
+ // We need to consider the compressed pages
+ // as well as the values that are not compressed yet
+ if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() +
+ estimated_bytes) > ROW_GROUP_SIZE) {
+ rg_writer->Close();
+ std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0);
+ rg_writer = file_writer->AppendBufferedRowGroup();
+ }
- // Write the Int64 column. Each row has repeats twice.
- parquet::Int64Writer* int64_writer =
- static_cast<parquet::Int64Writer*>(rg_writer->NextColumn());
- for (int i = 0; i < 2 * NUM_ROWS_PER_ROW_GROUP; i++) {
- int64_t value = i * 1000 * 1000;
- value *= 1000 * 1000;
+ int col_id = 0;
+ // Write the Bool column
+ parquet::BoolWriter* bool_writer =
+ static_cast<parquet::BoolWriter*>(rg_writer->column(col_id));
+ bool bool_value = ((i % 2) == 0) ? true : false;
+ bool_writer->WriteBatch(1, nullptr, nullptr, &bool_value);
+ buffered_values_estimate[col_id] = bool_writer->EstimatedBufferedValueBytes();
+
+ // Write the Int32 column
+ col_id++;
+ parquet::Int32Writer* int32_writer =
+ static_cast<parquet::Int32Writer*>(rg_writer->column(col_id));
+ int32_t int32_value = i;
+ int32_writer->WriteBatch(1, nullptr, nullptr, &int32_value);
+ buffered_values_estimate[col_id] = int32_writer->EstimatedBufferedValueBytes();
+
+ // Write the Int64 column. Each row has repeats twice.
+ col_id++;
+ parquet::Int64Writer* int64_writer =
+ static_cast<parquet::Int64Writer*>(rg_writer->column(col_id));
+ int64_t int64_value1 = 2 * i;
int16_t definition_level = 1;
int16_t repetition_level = 0;
- if ((i % 2) == 0) {
- repetition_level = 1; // start of a new record
- }
- int64_writer->WriteBatch(1, &definition_level, &repetition_level, &value);
- }
-
- // Write the INT96 column.
- parquet::Int96Writer* int96_writer =
- static_cast<parquet::Int96Writer*>(rg_writer->NextColumn());
- for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
- parquet::Int96 value;
- value.value[0] = i;
- value.value[1] = i + 1;
- value.value[2] = i + 2;
- int96_writer->WriteBatch(1, nullptr, nullptr, &value);
- }
-
- // Write the Float column
- parquet::FloatWriter* float_writer =
- static_cast<parquet::FloatWriter*>(rg_writer->NextColumn());
- for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
- float value = static_cast<float>(i) * 1.1f;
- float_writer->WriteBatch(1, nullptr, nullptr, &value);
- }
-
- // Write the Double column
- parquet::DoubleWriter* double_writer =
- static_cast<parquet::DoubleWriter*>(rg_writer->NextColumn());
- for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
- double value = i * 1.1111111;
- double_writer->WriteBatch(1, nullptr, nullptr, &value);
- }
-
- // Write the ByteArray column. Make every alternate values NULL
- parquet::ByteArrayWriter* ba_writer =
- static_cast<parquet::ByteArrayWriter*>(rg_writer->NextColumn());
- for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
- parquet::ByteArray value;
+ int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value1);
+ int64_t int64_value2 = (2 * i + 1);
+ repetition_level = 1; // start of a new record
+ int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value2);
+ buffered_values_estimate[col_id] = int64_writer->EstimatedBufferedValueBytes();
+
+ // Write the INT96 column.
+ col_id++;
+ parquet::Int96Writer* int96_writer =
+ static_cast<parquet::Int96Writer*>(rg_writer->column(col_id));
+ parquet::Int96 int96_value;
+ int96_value.value[0] = i;
+ int96_value.value[1] = i + 1;
+ int96_value.value[2] = i + 2;
+ int96_writer->WriteBatch(1, nullptr, nullptr, &int96_value);
+ buffered_values_estimate[col_id] = int96_writer->EstimatedBufferedValueBytes();
+
+ // Write the Float column
+ col_id++;
+ parquet::FloatWriter* float_writer =
+ static_cast<parquet::FloatWriter*>(rg_writer->column(col_id));
+ float float_value = static_cast<float>(i) * 1.1f;
+ float_writer->WriteBatch(1, nullptr, nullptr, &float_value);
+ buffered_values_estimate[col_id] = float_writer->EstimatedBufferedValueBytes();
+
+ // Write the Double column
+ col_id++;
+ parquet::DoubleWriter* double_writer =
+ static_cast<parquet::DoubleWriter*>(rg_writer->column(col_id));
+ double double_value = i * 1.1111111;
+ double_writer->WriteBatch(1, nullptr, nullptr, &double_value);
+ buffered_values_estimate[col_id] = double_writer->EstimatedBufferedValueBytes();
+
+ // Write the ByteArray column. Make every alternate values NULL
+ col_id++;
+ parquet::ByteArrayWriter* ba_writer =
+ static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id));
+ parquet::ByteArray ba_value;
char hello[FIXED_LENGTH] = "parquet";
hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
if (i % 2 == 0) {
int16_t definition_level = 1;
- value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
- value.len = FIXED_LENGTH;
- ba_writer->WriteBatch(1, &definition_level, nullptr, &value);
+ ba_value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
+ ba_value.len = FIXED_LENGTH;
+ ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value);
} else {
int16_t definition_level = 0;
ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
}
- }
+ buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes();
- // Write the FixedLengthByteArray column
- parquet::FixedLenByteArrayWriter* flba_writer =
- static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->NextColumn());
- for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
- parquet::FixedLenByteArray value;
+ // Write the FixedLengthByteArray column
+ col_id++;
+ parquet::FixedLenByteArrayWriter* flba_writer =
+ static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->column(col_id));
+ parquet::FixedLenByteArray flba_value;
char v = static_cast<char>(i);
char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
- value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
+ flba_value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
- flba_writer->WriteBatch(1, nullptr, nullptr, &value);
+ flba_writer->WriteBatch(1, nullptr, nullptr, &flba_value);
+ buffered_values_estimate[col_id] = flba_writer->EstimatedBufferedValueBytes();
}
+ // Close the RowGroupWriter
+ rg_writer->Close();
// Close the ParquetFileWriter
file_writer->Close();
@@ -236,34 +205,35 @@ int main(int argc, char** argv) {
// Get the File MetaData
std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
- // Get the number of RowGroups
int num_row_groups = file_metadata->num_row_groups();
- assert(num_row_groups == 1);
// Get the number of Columns
int num_columns = file_metadata->num_columns();
assert(num_columns == 8);
+ std::vector<int> col_row_counts(num_columns, 0);
+
// Iterate over all the RowGroups in the file
for (int r = 0; r < num_row_groups; ++r) {
// Get the RowGroup Reader
std::shared_ptr<parquet::RowGroupReader> row_group_reader =
parquet_reader->RowGroup(r);
+ assert(row_group_reader->metadata()->total_byte_size() < ROW_GROUP_SIZE);
+
int64_t values_read = 0;
int64_t rows_read = 0;
int16_t definition_level;
int16_t repetition_level;
- int i;
std::shared_ptr<parquet::ColumnReader> column_reader;
+ int col_id = 0;
// Get the Column Reader for the boolean column
- column_reader = row_group_reader->Column(0);
+ column_reader = row_group_reader->Column(col_id);
parquet::BoolReader* bool_reader =
static_cast<parquet::BoolReader*>(column_reader.get());
// Read all the rows in the column
- i = 0;
while (bool_reader->HasNext()) {
bool value;
// Read one value at a time. The number of rows read is returned. values_read
@@ -274,17 +244,17 @@ int main(int argc, char** argv) {
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
- bool expected_value = ((i % 2) == 0) ? true : false;
+ bool expected_value = ((col_row_counts[col_id] % 2) == 0) ? true : false;
assert(value == expected_value);
- i++;
+ col_row_counts[col_id]++;
}
// Get the Column Reader for the Int32 column
- column_reader = row_group_reader->Column(1);
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
parquet::Int32Reader* int32_reader =
static_cast<parquet::Int32Reader*>(column_reader.get());
// Read all the rows in the column
- i = 0;
while (int32_reader->HasNext()) {
int32_t value;
// Read one value at a time. The number of rows read is returned. values_read
@@ -295,16 +265,16 @@ int main(int argc, char** argv) {
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
- assert(value == i);
- i++;
+ assert(value == col_row_counts[col_id]);
+ col_row_counts[col_id]++;
}
// Get the Column Reader for the Int64 column
- column_reader = row_group_reader->Column(2);
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
parquet::Int64Reader* int64_reader =
static_cast<parquet::Int64Reader*>(column_reader.get());
// Read all the rows in the column
- i = 0;
while (int64_reader->HasNext()) {
int64_t value;
// Read one value at a time. The number of rows read is returned. values_read
@@ -316,23 +286,22 @@ int main(int argc, char** argv) {
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
- int64_t expected_value = i * 1000 * 1000;
- expected_value *= 1000 * 1000;
+ int64_t expected_value = col_row_counts[col_id];
assert(value == expected_value);
- if ((i % 2) == 0) {
- assert(repetition_level == 1);
- } else {
+ if ((col_row_counts[col_id] % 2) == 0) {
assert(repetition_level == 0);
+ } else {
+ assert(repetition_level == 1);
}
- i++;
+ col_row_counts[col_id]++;
}
// Get the Column Reader for the Int96 column
- column_reader = row_group_reader->Column(3);
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
parquet::Int96Reader* int96_reader =
static_cast<parquet::Int96Reader*>(column_reader.get());
// Read all the rows in the column
- i = 0;
while (int96_reader->HasNext()) {
parquet::Int96 value;
// Read one value at a time. The number of rows read is returned. values_read
@@ -344,21 +313,21 @@ int main(int argc, char** argv) {
assert(values_read == 1);
// Verify the value written
parquet::Int96 expected_value;
- expected_value.value[0] = i;
- expected_value.value[1] = i + 1;
- expected_value.value[2] = i + 2;
+ expected_value.value[0] = col_row_counts[col_id];
+ expected_value.value[1] = col_row_counts[col_id] + 1;
+ expected_value.value[2] = col_row_counts[col_id] + 2;
for (int j = 0; j < 3; j++) {
assert(value.value[j] == expected_value.value[j]);
}
- i++;
+ col_row_counts[col_id]++;
}
// Get the Column Reader for the Float column
- column_reader = row_group_reader->Column(4);
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
parquet::FloatReader* float_reader =
static_cast<parquet::FloatReader*>(column_reader.get());
// Read all the rows in the column
- i = 0;
while (float_reader->HasNext()) {
float value;
// Read one value at a time. The number of rows read is returned. values_read
@@ -369,17 +338,17 @@ int main(int argc, char** argv) {
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
- float expected_value = static_cast<float>(i) * 1.1f;
+ float expected_value = static_cast<float>(col_row_counts[col_id]) * 1.1f;
assert(value == expected_value);
- i++;
+ col_row_counts[col_id]++;
}
// Get the Column Reader for the Double column
- column_reader = row_group_reader->Column(5);
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
parquet::DoubleReader* double_reader =
static_cast<parquet::DoubleReader*>(column_reader.get());
// Read all the rows in the column
- i = 0;
while (double_reader->HasNext()) {
double value;
// Read one value at a time. The number of rows read is returned. values_read
@@ -390,17 +359,17 @@ int main(int argc, char** argv) {
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
- double expected_value = i * 1.1111111;
+ double expected_value = col_row_counts[col_id] * 1.1111111;
assert(value == expected_value);
- i++;
+ col_row_counts[col_id]++;
}
// Get the Column Reader for the ByteArray column
- column_reader = row_group_reader->Column(6);
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
parquet::ByteArrayReader* ba_reader =
static_cast<parquet::ByteArrayReader*>(column_reader.get());
// Read all the rows in the column
- i = 0;
while (ba_reader->HasNext()) {
parquet::ByteArray value;
// Read one value at a time. The number of rows read is returned. values_read
@@ -411,10 +380,10 @@ int main(int argc, char** argv) {
assert(rows_read == 1);
// Verify the value written
char expected_value[FIXED_LENGTH] = "parquet";
- expected_value[7] = static_cast<char>('0' + i / 100);
- expected_value[8] = static_cast<char>('0' + (i / 10) % 10);
- expected_value[9] = static_cast<char>('0' + i % 10);
- if (i % 2 == 0) { // only alternate values exist
+ expected_value[7] = static_cast<char>('0' + col_row_counts[col_id] / 100);
+ expected_value[8] = static_cast<char>('0' + (col_row_counts[col_id] / 10) % 10);
+ expected_value[9] = static_cast<char>('0' + col_row_counts[col_id] % 10);
+ if (col_row_counts[col_id] % 2 == 0) { // only alternate values exist
// There are no NULL values in the rows written
assert(values_read == 1);
assert(value.len == FIXED_LENGTH);
@@ -425,15 +394,15 @@ int main(int argc, char** argv) {
assert(values_read == 0);
assert(definition_level == 0);
}
- i++;
+ col_row_counts[col_id]++;
}
// Get the Column Reader for the FixedLengthByteArray column
- column_reader = row_group_reader->Column(7);
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
parquet::FixedLenByteArrayReader* flba_reader =
static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get());
// Read all the rows in the column
- i = 0;
while (flba_reader->HasNext()) {
parquet::FixedLenByteArray value;
// Read one value at a time. The number of rows read is returned. values_read
@@ -444,10 +413,10 @@ int main(int argc, char** argv) {
// There are no NULL values in the rows written
assert(values_read == 1);
// Verify the value written
- char v = static_cast<char>(i);
+ char v = static_cast<char>(col_row_counts[col_id]);
char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
- i++;
+ col_row_counts[col_id]++;
}
}
} catch (const std::exception& e) {
diff --git a/examples/low-level-api/reader_writer.h b/examples/low-level-api/reader_writer.h
new file mode 100644
index 0000000..3fda0cf
--- /dev/null
+++ b/examples/low-level-api/reader_writer.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/io/file.h>
+#include <arrow/util/logging.h>
+
+#include <parquet/api/reader.h>
+#include <parquet/api/writer.h>
+
+using parquet::LogicalType;
+using parquet::Repetition;
+using parquet::Type;
+using parquet::schema::GroupNode;
+using parquet::schema::PrimitiveNode;
+
+constexpr int FIXED_LENGTH = 10;
+
+static std::shared_ptr<GroupNode> SetupSchema() {
+ parquet::schema::NodeVector fields;
+ // Create a primitive node named 'boolean_field' with type:BOOLEAN,
+ // repetition:REQUIRED
+ fields.push_back(PrimitiveNode::Make("boolean_field", Repetition::REQUIRED,
+ Type::BOOLEAN, LogicalType::NONE));
+
+ // Create a primitive node named 'int32_field' with type:INT32, repetition:REQUIRED,
+ // logical type:TIME_MILLIS
+ fields.push_back(PrimitiveNode::Make("int32_field", Repetition::REQUIRED, Type::INT32,
+ LogicalType::TIME_MILLIS));
+
+ // Create a primitive node named 'int64_field' with type:INT64, repetition:REPEATED
+ fields.push_back(PrimitiveNode::Make("int64_field", Repetition::REPEATED, Type::INT64,
+ LogicalType::NONE));
+
+ fields.push_back(PrimitiveNode::Make("int96_field", Repetition::REQUIRED, Type::INT96,
+ LogicalType::NONE));
+
+ fields.push_back(PrimitiveNode::Make("float_field", Repetition::REQUIRED, Type::FLOAT,
+ LogicalType::NONE));
+
+ fields.push_back(PrimitiveNode::Make("double_field", Repetition::REQUIRED, Type::DOUBLE,
+ LogicalType::NONE));
+
+ // Create a primitive node named 'ba_field' with type:BYTE_ARRAY, repetition:OPTIONAL
+ fields.push_back(PrimitiveNode::Make("ba_field", Repetition::OPTIONAL, Type::BYTE_ARRAY,
+ LogicalType::NONE));
+
+ // Create a primitive node named 'flba_field' with type:FIXED_LEN_BYTE_ARRAY,
+ // repetition:REQUIRED, field_length = FIXED_LENGTH
+ fields.push_back(PrimitiveNode::Make("flba_field", Repetition::REQUIRED,
+ Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE,
+ FIXED_LENGTH));
+
+ // Create a GroupNode named 'schema' using the primitive nodes defined above
+ // This GroupNode is the root node of the schema tree
+ return std::static_pointer_cast<GroupNode>(
+ GroupNode::Make("schema", Repetition::REQUIRED, fields));
+}
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 19837db..631bb71 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -79,8 +79,7 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullA
size_t size, std::shared_ptr<Array>* out) {
using c_type = typename ArrowType::c_type;
std::vector<c_type> values;
- ::arrow::random_real(size, 0, static_cast<c_type>(0), static_cast<c_type>(1),
- &values);
+ ::arrow::random_real(size, 0, static_cast<c_type>(0), static_cast<c_type>(1), &values);
::arrow::NumericBuilder<ArrowType> builder;
RETURN_NOT_OK(builder.AppendValues(values.data(), values.size()));
return builder.Finish(out);
@@ -200,8 +199,8 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type Nullable
size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
using c_type = typename ArrowType::c_type;
std::vector<c_type> values;
- ::arrow::random_real(size, seed, static_cast<c_type>(-1e10),
- static_cast<c_type>(1e10), &values);
+ ::arrow::random_real(size, seed, static_cast<c_type>(-1e10), static_cast<c_type>(1e10),
+ &values);
std::vector<uint8_t> valid_bytes(size, 1);
for (size_t i = 0; i < num_nulls; i++) {
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index dd89281..e87d549 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -47,7 +47,6 @@ const int LARGE_SIZE = 100000;
const int VERY_LARGE_SIZE = 400000;
#endif
-
template <typename TestType>
class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
public:
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index 934530c..a65bda8 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -249,6 +249,16 @@ class SerializedPageWriter : public PageWriter {
bool has_compressor() override { return (compressor_ != nullptr); }
+ int64_t num_values() { return num_values_; }
+
+ int64_t dictionary_page_offset() { return dictionary_page_offset_; }
+
+ int64_t data_page_offset() { return data_page_offset_; }
+
+ int64_t total_compressed_size() { return total_compressed_size_; }
+
+ int64_t total_uncompressed_size() { return total_uncompressed_size_; }
+
private:
OutputStream* sink_;
ColumnChunkMetaDataBuilder* metadata_;
@@ -263,11 +273,64 @@ class SerializedPageWriter : public PageWriter {
std::unique_ptr<::arrow::Codec> compressor_;
};
+// This implementation of the PageWriter writes to the final sink on Close .
+class BufferedPageWriter : public PageWriter {
+ public:
+ BufferedPageWriter(OutputStream* sink, Compression::type codec,
+ ColumnChunkMetaDataBuilder* metadata,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+ : final_sink_(sink),
+ metadata_(metadata),
+ in_memory_sink_(new InMemoryOutputStream(pool)),
+ pager_(new SerializedPageWriter(in_memory_sink_.get(), codec, metadata, pool)) {}
+
+ int64_t WriteDictionaryPage(const DictionaryPage& page) override {
+ return pager_->WriteDictionaryPage(page);
+ }
+
+ void Close(bool has_dictionary, bool fallback) override {
+ // index_page_offset = -1 since they are not supported
+ metadata_->Finish(
+ pager_->num_values(), pager_->dictionary_page_offset() + final_sink_->Tell(), -1,
+ pager_->data_page_offset() + final_sink_->Tell(), pager_->total_compressed_size(),
+ pager_->total_uncompressed_size(), has_dictionary, fallback);
+
+ // Write metadata at end of column chunk
+ metadata_->WriteTo(in_memory_sink_.get());
+
+ // flush everything to the serialized sink
+ auto buffer = in_memory_sink_->GetBuffer();
+ final_sink_->Write(buffer->data(), buffer->size());
+ }
+
+ int64_t WriteDataPage(const CompressedDataPage& page) override {
+ return pager_->WriteDataPage(page);
+ }
+
+ void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
+ pager_->Compress(src_buffer, dest_buffer);
+ }
+
+ bool has_compressor() override { return pager_->has_compressor(); }
+
+ private:
+ OutputStream* final_sink_;
+ ColumnChunkMetaDataBuilder* metadata_;
+ std::unique_ptr<InMemoryOutputStream> in_memory_sink_;
+ std::unique_ptr<SerializedPageWriter> pager_;
+};
+
std::unique_ptr<PageWriter> PageWriter::Open(OutputStream* sink, Compression::type codec,
ColumnChunkMetaDataBuilder* metadata,
- ::arrow::MemoryPool* pool) {
- return std::unique_ptr<PageWriter>(
- new SerializedPageWriter(sink, codec, metadata, pool));
+ ::arrow::MemoryPool* pool,
+ bool buffered_row_group) {
+ if (buffered_row_group) {
+ return std::unique_ptr<PageWriter>(
+ new BufferedPageWriter(sink, codec, metadata, pool));
+ } else {
+ return std::unique_ptr<PageWriter>(
+ new SerializedPageWriter(sink, codec, metadata, pool));
+ }
}
// ----------------------------------------------------------------------
@@ -294,6 +357,7 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
num_buffered_encoded_values_(0),
rows_written_(0),
total_bytes_written_(0),
+ total_compressed_bytes_(0),
closed_(false),
fallback_(false) {
definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
@@ -404,6 +468,7 @@ void ColumnWriter::AddDataPage() {
CompressedDataPage page(compressed_data_copy,
static_cast<int32_t>(num_buffered_values_), encoding_,
Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
+ total_compressed_bytes_ += page.size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page));
} else { // Eagerly write pages
CompressedDataPage page(compressed_data, static_cast<int32_t>(num_buffered_values_),
@@ -432,7 +497,7 @@ int64_t ColumnWriter::Close() {
FlushBufferedDataPages();
EncodedStatistics chunk_statistics = GetChunkStatistics();
- // Write stats only if the column has atleast one row written
+ // Write stats only if the column has at least one row written
// From parquet-mr
// Don't write stats larger than the max size rather than truncating. The
// rationale is that some engines may use the minimum value in the page as
@@ -459,6 +524,7 @@ void ColumnWriter::FlushBufferedDataPages() {
WriteDataPage(data_pages_[i]);
}
data_pages_.clear();
+ total_compressed_bytes_ = 0;
}
// ----------------------------------------------------------------------
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
index 6b84748..1ba428a 100644
--- a/src/parquet/column_writer.h
+++ b/src/parquet/column_writer.h
@@ -75,7 +75,8 @@ class PageWriter {
static std::unique_ptr<PageWriter> Open(
OutputStream* sink, Compression::type codec, ColumnChunkMetaDataBuilder* metadata,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
+ bool buffered_row_group = false);
// The Column Writer decides if dictionary encoding is used if set and
// if the dictionary encoding has fallen back to default encoding on reaching dictionary
@@ -117,6 +118,12 @@ class PARQUET_EXPORT ColumnWriter {
int64_t rows_written() const { return rows_written_; }
+ // Only considers the size of the compressed pages + page header
+ // Some values might be still buffered an not written to a page yet
+ int64_t total_compressed_bytes() const { return total_compressed_bytes_; }
+
+ int64_t total_bytes_written() const { return total_bytes_written_; }
+
const WriterProperties* properties() { return properties_; }
protected:
@@ -192,6 +199,9 @@ class PARQUET_EXPORT ColumnWriter {
// Records the total number of bytes written by the serializer
int64_t total_bytes_written_;
+ // Records the current number of compressed bytes in a column
+ int64_t total_compressed_bytes_;
+
// Flag to check if the Writer has been closed
bool closed_;
@@ -258,6 +268,11 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
const int16_t* rep_levels, const uint8_t* valid_bits,
int64_t valid_bits_offset, const T* values);
+ // Estimated size of the values that are not written to a page yet
+ int64_t EstimatedBufferedValueBytes() const {
+ return current_encoder_->EstimatedDataEncodedSize();
+ }
+
protected:
std::shared_ptr<Buffer> GetValuesBuffer() override {
return current_encoder_->FlushValues();
diff --git a/src/parquet/file-serialize-test.cc b/src/parquet/file-serialize-test.cc
index 1993404..750faa2 100644
--- a/src/parquet/file-serialize-test.cc
+++ b/src/parquet/file-serialize-test.cc
@@ -41,8 +41,9 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
void SetUp() {
num_columns_ = 4;
- num_rowgroups_ = 2;
+ num_rowgroups_ = 4;
rows_per_rowgroup_ = 50;
+ rows_per_batch_ = 10;
this->SetUpSchema(Repetition::OPTIONAL, num_columns_);
}
@@ -50,6 +51,7 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
int num_columns_;
int num_rowgroups_;
int rows_per_rowgroup_;
+ int rows_per_batch_;
void FileSerializeTest(Compression::type codec_type) {
std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
@@ -63,20 +65,44 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
std::shared_ptr<WriterProperties> writer_properties = prop_builder.build();
auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
- for (int rg = 0; rg < num_rowgroups_; ++rg) {
+ this->GenerateData(rows_per_rowgroup_);
+ for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
RowGroupWriter* row_group_writer;
row_group_writer = file_writer->AppendRowGroup();
- this->GenerateData(rows_per_rowgroup_);
for (int col = 0; col < num_columns_; ++col) {
auto column_writer =
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
column_writer->WriteBatch(rows_per_rowgroup_, this->def_levels_.data(), nullptr,
this->values_ptr_);
column_writer->Close();
+ // Ensure column() API which is specific to BufferedRowGroup cannot be called
+ ASSERT_THROW(row_group_writer->column(col), ParquetException);
}
row_group_writer->Close();
}
+ // Write half BufferedRowGroups
+ for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
+ RowGroupWriter* row_group_writer;
+ row_group_writer = file_writer->AppendBufferedRowGroup();
+ for (int batch = 0; batch < (rows_per_rowgroup_ / rows_per_batch_); ++batch) {
+ for (int col = 0; col < num_columns_; ++col) {
+ auto column_writer =
+ static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
+ column_writer->WriteBatch(
+ rows_per_batch_, this->def_levels_.data() + (batch * rows_per_batch_),
+ nullptr, this->values_ptr_ + (batch * rows_per_batch_));
+ // Ensure NextColumn() API which is specific to RowGroup cannot be called
+ ASSERT_THROW(row_group_writer->NextColumn(), ParquetException);
+ }
+ }
+ for (int col = 0; col < num_columns_; ++col) {
+ auto column_writer =
+ static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
+ column_writer->Close();
+ }
+ row_group_writer->Close();
+ }
file_writer->Close();
auto buffer = sink->GetBuffer();
@@ -137,6 +163,30 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
file_writer->Close();
}
+ void UnequalNumRowsBuffered(int64_t max_rows,
+ const std::vector<int64_t> rows_per_column) {
+ std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
+ auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
+
+ std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
+
+ auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
+
+ RowGroupWriter* row_group_writer;
+ row_group_writer = file_writer->AppendBufferedRowGroup();
+
+ this->GenerateData(max_rows);
+ for (int col = 0; col < num_columns_; ++col) {
+ auto column_writer =
+ static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
+ column_writer->WriteBatch(rows_per_column[col], this->def_levels_.data(), nullptr,
+ this->values_ptr_);
+ column_writer->Close();
+ }
+ row_group_writer->Close();
+ file_writer->Close();
+ }
+
void RepeatedUnequalRows() {
// Optional and repeated, so definition and repetition levels
this->SetUpSchema(Repetition::REPEATED);
@@ -186,15 +236,23 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
RowGroupWriter* row_group_writer;
- row_group_writer = file_writer->AppendRowGroup();
+ row_group_writer = file_writer->AppendRowGroup();
for (int col = 0; col < num_columns_; ++col) {
auto column_writer =
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
column_writer->Close();
}
+ row_group_writer->Close();
+ row_group_writer = file_writer->AppendBufferedRowGroup();
+ for (int col = 0; col < num_columns_; ++col) {
+ auto column_writer =
+ static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
+ column_writer->Close();
+ }
row_group_writer->Close();
+
file_writer->Close();
}
};
@@ -212,11 +270,13 @@ TYPED_TEST(TestSerialize, SmallFileUncompressed) {
TYPED_TEST(TestSerialize, TooFewRows) {
std::vector<int64_t> num_rows = {100, 100, 100, 99};
ASSERT_THROW(this->UnequalNumRows(100, num_rows), ParquetException);
+ ASSERT_THROW(this->UnequalNumRowsBuffered(100, num_rows), ParquetException);
}
TYPED_TEST(TestSerialize, TooManyRows) {
std::vector<int64_t> num_rows = {100, 100, 100, 101};
ASSERT_THROW(this->UnequalNumRows(101, num_rows), ParquetException);
+ ASSERT_THROW(this->UnequalNumRowsBuffered(101, num_rows), ParquetException);
}
TYPED_TEST(TestSerialize, ZeroRows) { ASSERT_NO_THROW(this->ZeroRowsRowGroup()); }
diff --git a/src/parquet/file_writer.cc b/src/parquet/file_writer.cc
index 9b2d9b0..30673c5 100644
--- a/src/parquet/file_writer.cc
+++ b/src/parquet/file_writer.cc
@@ -17,6 +17,8 @@
#include "parquet/file_writer.h"
+#include <vector>
+
#include "parquet/column_writer.h"
#include "parquet/schema-internal.h"
#include "parquet/schema.h"
@@ -47,12 +49,28 @@ void RowGroupWriter::Close() {
ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); }
+ColumnWriter* RowGroupWriter::column(int i) { return contents_->column(i); }
+
+int64_t RowGroupWriter::total_compressed_bytes() const {
+ return contents_->total_compressed_bytes();
+}
+
+int64_t RowGroupWriter::total_bytes_written() const {
+ return contents_->total_bytes_written();
+}
+
int RowGroupWriter::current_column() { return contents_->current_column(); }
int RowGroupWriter::num_columns() const { return contents_->num_columns(); }
int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
+inline void ThrowRowsMisMatchError(int col, int64_t prev, int64_t curr) {
+ std::stringstream ss;
+ ss << "Column " << col << " had " << curr << " while previous column had " << prev;
+ throw ParquetException(ss.str());
+}
+
// ----------------------------------------------------------------------
// RowGroupSerializer
@@ -60,34 +78,45 @@ int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
class RowGroupSerializer : public RowGroupWriter::Contents {
public:
RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata,
- const WriterProperties* properties)
+ const WriterProperties* properties, bool buffered_row_group = false)
: sink_(sink),
metadata_(metadata),
properties_(properties),
total_bytes_written_(0),
closed_(false),
current_column_index_(0),
- num_rows_(-1) {}
+ num_rows_(0),
+ buffered_row_group_(buffered_row_group) {
+ if (buffered_row_group) {
+ InitColumns();
+ } else {
+ column_writers_.push_back(nullptr);
+ }
+ }
int num_columns() const override { return metadata_->num_columns(); }
int64_t num_rows() const override {
- if (current_column_writer_) {
- CheckRowsWritten();
- }
- return num_rows_ < 0 ? 0 : num_rows_;
+ CheckRowsWritten();
+ // CheckRowsWritten ensures num_rows_ is set correctly
+ return num_rows_;
}
ColumnWriter* NextColumn() override {
- if (current_column_writer_) {
+ if (buffered_row_group_) {
+ throw ParquetException(
+ "NextColumn() is not supported when a RowGroup is written by size");
+ }
+
+ if (column_writers_[0]) {
CheckRowsWritten();
}
// Throws an error if more columns are being written
auto col_meta = metadata_->NextColumnChunk();
- if (current_column_writer_) {
- total_bytes_written_ += current_column_writer_->Close();
+ if (column_writers_[0]) {
+ total_bytes_written_ += column_writers_[0]->Close();
}
++current_column_index_;
@@ -96,23 +125,60 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
std::unique_ptr<PageWriter> pager =
PageWriter::Open(sink_, properties_->compression(column_descr->path()), col_meta,
properties_->memory_pool());
- current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager), properties_);
- return current_column_writer_.get();
+ column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_);
+ return column_writers_[0].get();
+ }
+
+ ColumnWriter* column(int i) override {
+ if (!buffered_row_group_) {
+ throw ParquetException(
+ "column() is only supported when a BufferedRowGroup is being written");
+ }
+
+ if (i >= 0 && i < static_cast<int>(column_writers_.size())) {
+ return column_writers_[i].get();
+ }
+ return nullptr;
}
int current_column() const override { return metadata_->current_column(); }
+ int64_t total_compressed_bytes() const override {
+ int64_t total_compressed_bytes = 0;
+ for (size_t i = 0; i < column_writers_.size(); i++) {
+ if (column_writers_[i]) {
+ total_compressed_bytes += column_writers_[i]->total_compressed_bytes();
+ }
+ }
+ return total_compressed_bytes;
+ }
+
+ int64_t total_bytes_written() const override {
+ int64_t total_bytes_written = 0;
+ for (size_t i = 0; i < column_writers_.size(); i++) {
+ if (column_writers_[i]) {
+ total_bytes_written += column_writers_[i]->total_bytes_written();
+ }
+ }
+ return total_bytes_written;
+ }
+
void Close() override {
if (!closed_) {
closed_ = true;
+ CheckRowsWritten();
- if (current_column_writer_) {
- CheckRowsWritten();
- total_bytes_written_ += current_column_writer_->Close();
- current_column_writer_.reset();
+ for (size_t i = 0; i < column_writers_.size(); i++) {
+ if (column_writers_[i]) {
+ total_bytes_written_ += column_writers_[i]->Close();
+ column_writers_[i].reset();
+ }
}
+ column_writers_.clear();
+
// Ensures all columns have been written
+ metadata_->set_num_rows(num_rows_);
metadata_->Finish(total_bytes_written_);
}
}
@@ -125,21 +191,43 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
bool closed_;
int current_column_index_;
mutable int64_t num_rows_;
+ bool buffered_row_group_;
void CheckRowsWritten() const {
- int64_t current_rows = current_column_writer_->rows_written();
- if (num_rows_ < 0) {
- num_rows_ = current_rows;
- metadata_->set_num_rows(current_rows);
- } else if (num_rows_ != current_rows) {
- std::stringstream ss;
- ss << "Column " << current_column_index_ << " had " << current_rows
- << " while previous column had " << num_rows_;
- throw ParquetException(ss.str());
+ // verify when only one column is written at a time
+ if (!buffered_row_group_ && column_writers_.size() > 0 && column_writers_[0]) {
+ int64_t current_col_rows = column_writers_[0]->rows_written();
+ if (num_rows_ == 0) {
+ num_rows_ = current_col_rows;
+ } else if (num_rows_ != current_col_rows) {
+ ThrowRowsMisMatchError(current_column_index_, current_col_rows, num_rows_);
+ }
+ } else if (buffered_row_group_ &&
+ column_writers_.size() > 0) { // when buffered_row_group = true
+ int64_t current_col_rows = column_writers_[0]->rows_written();
+ for (int i = 1; i < static_cast<int>(column_writers_.size()); i++) {
+ int64_t current_col_rows_i = column_writers_[i]->rows_written();
+ if (current_col_rows != current_col_rows_i) {
+ ThrowRowsMisMatchError(i, current_col_rows_i, current_col_rows);
+ }
+ }
+ num_rows_ = current_col_rows;
+ }
+ }
+
+ void InitColumns() {
+ for (int i = 0; i < num_columns(); i++) {
+ auto col_meta = metadata_->NextColumnChunk();
+ const ColumnDescriptor* column_descr = col_meta->descr();
+ std::unique_ptr<PageWriter> pager =
+ PageWriter::Open(sink_, properties_->compression(column_descr->path()),
+ col_meta, properties_->memory_pool(), buffered_row_group_);
+ column_writers_.push_back(
+ ColumnWriter::Make(col_meta, std::move(pager), properties_));
}
}
- std::shared_ptr<ColumnWriter> current_column_writer_;
+ std::vector<std::shared_ptr<ColumnWriter>> column_writers_;
};
// ----------------------------------------------------------------------
@@ -187,18 +275,22 @@ class FileSerializer : public ParquetFileWriter::Contents {
return properties_;
}
- RowGroupWriter* AppendRowGroup() override {
+ RowGroupWriter* AppendRowGroup(bool buffered_row_group) {
if (row_group_writer_) {
row_group_writer_->Close();
}
num_row_groups_++;
auto rg_metadata = metadata_->AppendRowGroup();
- std::unique_ptr<RowGroupWriter::Contents> contents(
- new RowGroupSerializer(sink_.get(), rg_metadata, properties_.get()));
+ std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer(
+ sink_.get(), rg_metadata, properties_.get(), buffered_row_group));
row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
return row_group_writer_.get();
}
+ RowGroupWriter* AppendRowGroup() override { return AppendRowGroup(false); }
+
+ RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); }
+
~FileSerializer() override {
try {
Close();
@@ -227,6 +319,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
int num_row_groups_;
int64_t num_rows_;
std::unique_ptr<FileMetaDataBuilder> metadata_;
+ // Only one of the row group writers is active at a time
std::unique_ptr<RowGroupWriter> row_group_writer_;
void StartFile() {
@@ -311,6 +404,10 @@ RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
return contents_->AppendRowGroup();
}
+RowGroupWriter* ParquetFileWriter::AppendBufferedRowGroup() {
+ return contents_->AppendBufferedRowGroup();
+}
+
RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
return AppendRowGroup();
}
diff --git a/src/parquet/file_writer.h b/src/parquet/file_writer.h
index de17982..cdfe06c 100644
--- a/src/parquet/file_writer.h
+++ b/src/parquet/file_writer.h
@@ -49,15 +49,25 @@ class PARQUET_EXPORT RowGroupWriter {
virtual int num_columns() const = 0;
virtual int64_t num_rows() const = 0;
+ // to be used only with ParquetFileWriter::AppendRowGroup
virtual ColumnWriter* NextColumn() = 0;
+ // to be used only with ParquetFileWriter::AppendBufferedRowGroup
+ virtual ColumnWriter* column(int i) = 0;
+
virtual int current_column() const = 0;
virtual void Close() = 0;
+
+ // total bytes written by the page writer
+ virtual int64_t total_bytes_written() const = 0;
+ // total bytes still compressed but not written
+ virtual int64_t total_compressed_bytes() const = 0;
};
explicit RowGroupWriter(std::unique_ptr<Contents> contents);
/// Construct a ColumnWriter for the indicated row group-relative column.
///
+ /// To be used only with ParquetFileWriter::AppendRowGroup
/// Ownership is solely within the RowGroupWriter. The ColumnWriter is only
/// valid until the next call to NextColumn or Close. As the contents are
/// directly written to the sink, once a new column is started, the contents
@@ -69,11 +79,22 @@ class PARQUET_EXPORT RowGroupWriter {
int num_columns() const;
+ /// Construct a ColumnWriter for the indicated row group column.
+ ///
+ /// To be used only with ParquetFileWriter::AppendBufferedRowGroup
+ /// Ownership is solely within the RowGroupWriter. The ColumnWriter is
+ /// valid until Close. The contents are buffered in memory and written to sink
+ /// on Close
+ ColumnWriter* column(int i);
+
/**
* Number of rows that shall be written as part of this RowGroup.
*/
int64_t num_rows() const;
+ int64_t total_bytes_written() const;
+ int64_t total_compressed_bytes() const;
+
private:
// Holds a pointer to an instance of Contents implementation
std::unique_ptr<Contents> contents_;
@@ -101,6 +122,7 @@ class PARQUET_EXPORT ParquetFileWriter {
RowGroupWriter* AppendRowGroup(int64_t num_rows);
virtual RowGroupWriter* AppendRowGroup() = 0;
+ virtual RowGroupWriter* AppendBufferedRowGroup() = 0;
virtual int64_t num_rows() const = 0;
virtual int num_columns() const = 0;
@@ -142,7 +164,7 @@ class PARQUET_EXPORT ParquetFileWriter {
// Construct a RowGroupWriter for the indicated number of rows.
//
// Ownership is solely within the ParquetFileWriter. The RowGroupWriter is only valid
- // until the next call to AppendRowGroup or Close.
+ // until the next call to AppendRowGroup or AppendBufferedRowGroup or Close.
// @param num_rows The number of rows that are stored in the new RowGroup
//
// \deprecated Since 1.3.0
@@ -151,9 +173,16 @@ class PARQUET_EXPORT ParquetFileWriter {
/// Construct a RowGroupWriter with an arbitrary number of rows.
///
/// Ownership is solely within the ParquetFileWriter. The RowGroupWriter is only valid
- /// until the next call to AppendRowGroup or Close.
+ /// until the next call to AppendRowGroup or AppendBufferedRowGroup or Close.
RowGroupWriter* AppendRowGroup();
+ /// Construct a RowGroupWriter that buffers all the values until the RowGroup is ready.
+ /// Use this if you want to write a RowGroup based on a certain size
+ ///
+ /// Ownership is solely within the ParquetFileWriter. The RowGroupWriter is only valid
+ /// until the next call to AppendRowGroup or AppendBufferedRowGroup or Close.
+ RowGroupWriter* AppendBufferedRowGroup();
+
/// Number of columns.
///
/// This number is fixed during the lifetime of the writer as it is determined via