You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/06/29 18:57:15 UTC
arrow git commit: ARROW-215: Support other integer types and strings
in Parquet I/O
Repository: arrow
Updated Branches:
refs/heads/master ef9083029 -> 2f52cf4ee
ARROW-215: Support other integer types and strings in Parquet I/O
Change-Id: I72c6c82bc38c895a04172531bebbc78d4fb08732
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/2f52cf4e
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/2f52cf4e
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/2f52cf4e
Branch: refs/heads/master
Commit: 2f52cf4eed1033d1bf1f043d9063e462e60d6605
Parents: ef90830
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Sun Jun 12 11:48:10 2016 +0200
Committer: Uwe L. Korn <uw...@xhochy.com>
Committed: Tue Jun 28 21:32:32 2016 +0200
----------------------------------------------------------------------
cpp/src/arrow/parquet/parquet-io-test.cc | 461 ++++++++++++++--------
cpp/src/arrow/parquet/parquet-schema-test.cc | 4 +-
cpp/src/arrow/parquet/reader.cc | 160 +++++++-
cpp/src/arrow/parquet/schema.cc | 47 ++-
cpp/src/arrow/parquet/schema.h | 9 +-
cpp/src/arrow/parquet/test-util.h | 136 ++++++-
cpp/src/arrow/parquet/writer.cc | 234 +++++++++--
cpp/src/arrow/parquet/writer.h | 9 +-
cpp/src/arrow/test-util.h | 2 +
cpp/src/arrow/types/primitive.cc | 5 +
python/pyarrow/includes/parquet.pxd | 13 +-
python/pyarrow/parquet.pyx | 22 +-
python/pyarrow/tests/test_parquet.py | 43 +-
13 files changed, 901 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/parquet-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc
index edcac88..572cae1 100644
--- a/cpp/src/arrow/parquet/parquet-io-test.cc
+++ b/cpp/src/arrow/parquet/parquet-io-test.cc
@@ -21,7 +21,9 @@
#include "arrow/parquet/test-util.h"
#include "arrow/parquet/reader.h"
#include "arrow/parquet/writer.h"
+#include "arrow/types/construct.h"
#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"
@@ -30,12 +32,15 @@
using ParquetBuffer = parquet::Buffer;
using parquet::BufferReader;
+using parquet::default_writer_properties;
using parquet::InMemoryOutputStream;
+using parquet::LogicalType;
using parquet::ParquetFileReader;
using parquet::ParquetFileWriter;
using parquet::RandomAccessSource;
using parquet::Repetition;
using parquet::SchemaDescriptor;
+using parquet::ParquetVersion;
using ParquetType = parquet::Type;
using parquet::schema::GroupNode;
using parquet::schema::NodePtr;
@@ -52,25 +57,113 @@ template <typename TestType>
struct test_traits {};
template <>
+struct test_traits<BooleanType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static uint8_t const value;
+};
+
+const uint8_t test_traits<BooleanType>::value(1);
+
+template <>
+struct test_traits<UInt8Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::UINT_8;
+ static uint8_t const value;
+};
+
+const uint8_t test_traits<UInt8Type>::value(64);
+
+template <>
+struct test_traits<Int8Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::INT_8;
+ static int8_t const value;
+};
+
+const int8_t test_traits<Int8Type>::value(-64);
+
+template <>
+struct test_traits<UInt16Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::UINT_16;
+ static uint16_t const value;
+};
+
+const uint16_t test_traits<UInt16Type>::value(1024);
+
+template <>
+struct test_traits<Int16Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::INT_16;
+ static int16_t const value;
+};
+
+const int16_t test_traits<Int16Type>::value(-1024);
+
+template <>
+struct test_traits<UInt32Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::UINT_32;
+ static uint32_t const value;
+};
+
+const uint32_t test_traits<UInt32Type>::value(1024);
+
+template <>
struct test_traits<Int32Type> {
static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static int32_t const value;
+};
+
+const int32_t test_traits<Int32Type>::value(-1024);
+
+template <>
+struct test_traits<UInt64Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+ static constexpr LogicalType::type logical_enum = LogicalType::UINT_64;
+ static uint64_t const value;
};
+const uint64_t test_traits<UInt64Type>::value(1024);
+
template <>
struct test_traits<Int64Type> {
static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static int64_t const value;
};
+const int64_t test_traits<Int64Type>::value(-1024);
+
template <>
struct test_traits<FloatType> {
static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static float const value;
};
+const float test_traits<FloatType>::value(2.1f);
+
template <>
struct test_traits<DoubleType> {
static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static double const value;
+};
+
+const double test_traits<DoubleType>::value(4.2);
+
+template <>
+struct test_traits<StringType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
+ static constexpr LogicalType::type logical_enum = LogicalType::UTF8;
+ static std::string const value;
};
+const std::string test_traits<StringType>::value("Test");
+
template <typename T>
using ParquetDataType = ::parquet::DataType<test_traits<T>::parquet_enum>;
@@ -80,18 +173,18 @@ using ParquetWriter = ::parquet::TypedColumnWriter<ParquetDataType<T>>;
template <typename TestType>
class TestParquetIO : public ::testing::Test {
public:
- typedef typename TestType::c_type T;
virtual void SetUp() {}
- std::shared_ptr<GroupNode> MakeSchema(
- ParquetType::type parquet_type, Repetition::type repetition) {
- auto pnode = PrimitiveNode::Make("column1", repetition, parquet_type);
+ std::shared_ptr<GroupNode> MakeSchema(Repetition::type repetition) {
+ auto pnode = PrimitiveNode::Make("column1", repetition,
+ test_traits<TestType>::parquet_enum, test_traits<TestType>::logical_enum);
NodePtr node_ =
GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
return std::static_pointer_cast<GroupNode>(node_);
}
- std::unique_ptr<ParquetFileWriter> MakeWriter(std::shared_ptr<GroupNode>& schema) {
+ std::unique_ptr<ParquetFileWriter> MakeWriter(
+ const std::shared_ptr<GroupNode>& schema) {
sink_ = std::make_shared<InMemoryOutputStream>();
return ParquetFileWriter::Open(sink_, schema);
}
@@ -106,113 +199,74 @@ class TestParquetIO : public ::testing::Test {
std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Array>* out) {
arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
- ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader)));
+ ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader));
ASSERT_NE(nullptr, column_reader.get());
+
ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out));
ASSERT_NE(nullptr, out->get());
}
+ void ReadAndCheckSingleColumnFile(Array* values) {
+ std::shared_ptr<Array> out;
+ ReadSingleColumnFile(ReaderFromSink(), &out);
+ ASSERT_TRUE(values->Equals(out));
+ }
+
void ReadTableFromFile(
std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Table>* out) {
arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
- ASSERT_NO_THROW(ASSERT_OK(reader.ReadFlatTable(out)));
+ ASSERT_OK_NO_THROW(reader.ReadFlatTable(out));
ASSERT_NE(nullptr, out->get());
}
- std::unique_ptr<ParquetFileReader> TestFile(std::vector<T>& values, int num_chunks) {
- std::shared_ptr<GroupNode> schema =
- MakeSchema(test_traits<TestType>::parquet_enum, Repetition::REQUIRED);
- std::unique_ptr<ParquetFileWriter> file_writer = MakeWriter(schema);
- size_t chunk_size = values.size() / num_chunks;
- for (int i = 0; i < num_chunks; i++) {
- auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
- auto column_writer =
- static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
- T* data = values.data() + i * chunk_size;
- column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
- column_writer->Close();
- row_group_writer->Close();
- }
- file_writer->Close();
- return ReaderFromSink();
+ void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) {
+ std::shared_ptr<Table> out;
+ ReadTableFromFile(ReaderFromSink(), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(values->length(), out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+ }
+
+ template <typename ArrayType>
+ void WriteFlatColumn(const std::shared_ptr<GroupNode>& schema,
+ const std::shared_ptr<ArrayType>& values) {
+ FileWriter writer(default_memory_pool(), MakeWriter(schema));
+ ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
+ ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get()));
+ ASSERT_OK_NO_THROW(writer.Close());
}
std::shared_ptr<InMemoryOutputStream> sink_;
};
-typedef ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType> TestTypes;
-
-TYPED_TEST_CASE(TestParquetIO, TestTypes);
-
-TYPED_TEST(TestParquetIO, SingleColumnRequiredRead) {
- std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
- std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 1);
-
- std::shared_ptr<Array> out;
- this->ReadSingleColumnFile(std::move(file_reader), &out);
-
- ExpectArray<typename TypeParam::c_type>(values.data(), out.get());
-}
-
-TYPED_TEST(TestParquetIO, SingleColumnRequiredTableRead) {
- std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
- std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 1);
-
- std::shared_ptr<Table> out;
- this->ReadTableFromFile(std::move(file_reader), &out);
- ASSERT_EQ(1, out->num_columns());
- ASSERT_EQ(SMALL_SIZE, out->num_rows());
-
- std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
- ASSERT_EQ(1, chunked_array->num_chunks());
- ExpectArray<typename TypeParam::c_type>(values.data(), chunked_array->chunk(0).get());
-}
-
-TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedRead) {
- std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
- std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 4);
-
- std::shared_ptr<Array> out;
- this->ReadSingleColumnFile(std::move(file_reader), &out);
+// We habe separate tests for UInt32Type as this is currently the only type
+// where a roundtrip does not yield the identical Array structure.
+// There we write an UInt32 Array but receive an Int64 Array as result for
+// Parquet version 1.0.
- ExpectArray<typename TypeParam::c_type>(values.data(), out.get());
-}
-
-TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedTableRead) {
- std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
- std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 4);
-
- std::shared_ptr<Table> out;
- this->ReadTableFromFile(std::move(file_reader), &out);
- ASSERT_EQ(1, out->num_columns());
- ASSERT_EQ(SMALL_SIZE, out->num_rows());
+typedef ::testing::Types<BooleanType, UInt8Type, Int8Type, UInt16Type, Int16Type,
+ Int32Type, UInt64Type, Int64Type, FloatType, DoubleType, StringType> TestTypes;
- std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
- ASSERT_EQ(1, chunked_array->num_chunks());
- ExpectArray<typename TypeParam::c_type>(values.data(), chunked_array->chunk(0).get());
-}
+TYPED_TEST_CASE(TestParquetIO, TestTypes);
TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
- std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128);
+ auto values = NonNullArray<TypeParam>(SMALL_SIZE);
- std::shared_ptr<GroupNode> schema =
- this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::REQUIRED);
- FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
- ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length())));
- ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get())));
- ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+ this->WriteFlatColumn(schema, values);
- std::shared_ptr<Array> out;
- this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
- ASSERT_TRUE(values->Equals(out));
+ this->ReadAndCheckSingleColumnFile(values.get());
}
TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
- std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128);
+ auto values = NonNullArray<TypeParam>(SMALL_SIZE);
std::shared_ptr<Table> table = MakeSimpleTable(values, false);
this->sink_ = std::make_shared<InMemoryOutputStream>();
- ASSERT_NO_THROW(ASSERT_OK(
- WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length())));
+ ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_,
+ values->length(), default_writer_properties()));
std::shared_ptr<Table> out;
this->ReadTableFromFile(this->ReaderFromSink(), &out);
@@ -226,113 +280,208 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
// This also tests max_definition_level = 1
- std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10);
+ auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
- std::shared_ptr<GroupNode> schema =
- this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::OPTIONAL);
- FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
- ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length())));
- ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get())));
- ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+ this->WriteFlatColumn(schema, values);
- std::shared_ptr<Array> out;
- this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
- ASSERT_TRUE(values->Equals(out));
+ this->ReadAndCheckSingleColumnFile(values.get());
}
TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
// This also tests max_definition_level = 1
- std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10);
+ std::shared_ptr<Array> values = NullableArray<TypeParam>(SMALL_SIZE, 10);
std::shared_ptr<Table> table = MakeSimpleTable(values, true);
this->sink_ = std::make_shared<InMemoryOutputStream>();
- ASSERT_NO_THROW(ASSERT_OK(
- WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length())));
-
- std::shared_ptr<Table> out;
- this->ReadTableFromFile(this->ReaderFromSink(), &out);
- ASSERT_EQ(1, out->num_columns());
- ASSERT_EQ(SMALL_SIZE, out->num_rows());
+ ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_,
+ values->length(), default_writer_properties()));
- std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
- ASSERT_EQ(1, chunked_array->num_chunks());
- ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+ this->ReadAndCheckSingleColumnTable(values);
}
-TYPED_TEST(TestParquetIO, SingleColumnIntRequiredChunkedWrite) {
- std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128);
- std::shared_ptr<PrimitiveArray> values_chunk =
- NonNullArray<TypeParam>(SMALL_SIZE / 4, 128);
+TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
+ auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+ int64_t chunk_size = values->length() / 4;
- std::shared_ptr<GroupNode> schema =
- this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::REQUIRED);
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
for (int i = 0; i < 4; i++) {
- ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length())));
- ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get())));
+ ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+ ASSERT_OK_NO_THROW(
+ writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
}
- ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+ ASSERT_OK_NO_THROW(writer.Close());
- std::shared_ptr<Array> out;
- this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
- ASSERT_TRUE(values->Equals(out));
+ this->ReadAndCheckSingleColumnFile(values.get());
}
TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
- std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(LARGE_SIZE, 128);
+ auto values = NonNullArray<TypeParam>(LARGE_SIZE);
std::shared_ptr<Table> table = MakeSimpleTable(values, false);
this->sink_ = std::make_shared<InMemoryOutputStream>();
- ASSERT_NO_THROW(
- ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512)));
-
- std::shared_ptr<Table> out;
- this->ReadTableFromFile(this->ReaderFromSink(), &out);
- ASSERT_EQ(1, out->num_columns());
- ASSERT_EQ(LARGE_SIZE, out->num_rows());
+ ASSERT_OK_NO_THROW(WriteFlatTable(
+ table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties()));
- std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
- ASSERT_EQ(1, chunked_array->num_chunks());
- ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+ this->ReadAndCheckSingleColumnTable(values);
}
TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
- std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10);
- std::shared_ptr<PrimitiveArray> values_chunk_nulls =
- NullableArray<TypeParam>(SMALL_SIZE / 4, 128, 10);
- std::shared_ptr<PrimitiveArray> values_chunk =
- NullableArray<TypeParam>(SMALL_SIZE / 4, 128, 0);
-
- std::shared_ptr<GroupNode> schema =
- this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::OPTIONAL);
+ int64_t chunk_size = SMALL_SIZE / 4;
+ auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
- ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk_nulls->length())));
- ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk_nulls.get())));
- for (int i = 0; i < 3; i++) {
- ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length())));
- ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get())));
+ for (int i = 0; i < 4; i++) {
+ ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+ ASSERT_OK_NO_THROW(
+ writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
}
- ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+ ASSERT_OK_NO_THROW(writer.Close());
- std::shared_ptr<Array> out;
- this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
- ASSERT_TRUE(values->Equals(out));
+ this->ReadAndCheckSingleColumnFile(values.get());
}
TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
// This also tests max_definition_level = 1
- std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(LARGE_SIZE, 128, 100);
+ auto values = NullableArray<TypeParam>(LARGE_SIZE, 100);
std::shared_ptr<Table> table = MakeSimpleTable(values, true);
this->sink_ = std::make_shared<InMemoryOutputStream>();
- ASSERT_NO_THROW(
- ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512)));
+ ASSERT_OK_NO_THROW(WriteFlatTable(
+ table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties()));
- std::shared_ptr<Table> out;
- this->ReadTableFromFile(this->ReaderFromSink(), &out);
- ASSERT_EQ(1, out->num_columns());
- ASSERT_EQ(LARGE_SIZE, out->num_rows());
+ this->ReadAndCheckSingleColumnTable(values);
+}
- std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
- ASSERT_EQ(1, chunked_array->num_chunks());
- ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+using TestUInt32ParquetIO = TestParquetIO<UInt32Type>;
+
+TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
+ // This also tests max_definition_level = 1
+ std::shared_ptr<PrimitiveArray> values = NullableArray<UInt32Type>(LARGE_SIZE, 100);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+ // Parquet 2.0 roundtrip should yield an uint32_t column again
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ std::shared_ptr<::parquet::WriterProperties> properties =
+ ::parquet::WriterProperties::Builder()
+ .version(ParquetVersion::PARQUET_2_0)
+ ->build();
+ ASSERT_OK_NO_THROW(
+ WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
+ this->ReadAndCheckSingleColumnTable(values);
+}
+
+TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
+ // This also tests max_definition_level = 1
+ std::shared_ptr<PrimitiveArray> values = NullableArray<UInt32Type>(LARGE_SIZE, 100);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+ // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0
+ // reader that a column is unsigned.
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ std::shared_ptr<::parquet::WriterProperties> properties =
+ ::parquet::WriterProperties::Builder()
+ .version(ParquetVersion::PARQUET_1_0)
+ ->build();
+ ASSERT_OK_NO_THROW(
+ WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
+
+ std::shared_ptr<Array> expected_values;
+ std::shared_ptr<PoolBuffer> int64_data =
+ std::make_shared<PoolBuffer>(default_memory_pool());
+ {
+ ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
+ int64_t* int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
+ const uint32_t* uint32_data_ptr =
+ reinterpret_cast<const uint32_t*>(values->data()->data());
+ // std::copy might be faster but this is explicit on the casts)
+ for (int64_t i = 0; i < values->length(); i++) {
+ int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]);
+ }
+ }
+ ASSERT_OK(MakePrimitiveArray(std::make_shared<Int64Type>(), values->length(),
+ int64_data, values->null_count(), values->null_bitmap(), &expected_values));
+ this->ReadAndCheckSingleColumnTable(expected_values);
+}
+
+template <typename T>
+using ParquetCDataType = typename ParquetDataType<T>::c_type;
+
+template <typename TestType>
+class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
+ public:
+ typedef typename TestType::c_type T;
+
+ void TestFile(std::vector<T>& values, int num_chunks,
+ std::unique_ptr<ParquetFileReader>* file_reader) {
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+ std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema);
+ size_t chunk_size = values.size() / num_chunks;
+ // Convert to Parquet's expected physical type
+ std::vector<uint8_t> values_buffer(
+ sizeof(ParquetCDataType<TestType>) * values.size());
+ auto values_parquet =
+ reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data());
+ std::copy(values.cbegin(), values.cend(), values_parquet);
+ for (int i = 0; i < num_chunks; i++) {
+ auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
+ auto column_writer =
+ static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
+ ParquetCDataType<TestType>* data = values_parquet + i * chunk_size;
+ column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
+ column_writer->Close();
+ row_group_writer->Close();
+ }
+ file_writer->Close();
+ *file_reader = this->ReaderFromSink();
+ }
+
+ void TestSingleColumnRequiredTableRead(int num_chunks) {
+ std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+ std::unique_ptr<ParquetFileReader> file_reader;
+ ASSERT_NO_THROW(TestFile(values, num_chunks, &file_reader));
+
+ std::shared_ptr<Table> out;
+ this->ReadTableFromFile(std::move(file_reader), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(SMALL_SIZE, out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get());
+ }
+
+ void TestSingleColumnRequiredRead(int num_chunks) {
+ std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+ std::unique_ptr<ParquetFileReader> file_reader;
+ ASSERT_NO_THROW(TestFile(values, num_chunks, &file_reader));
+
+ std::shared_ptr<Array> out;
+ this->ReadSingleColumnFile(std::move(file_reader), &out);
+
+ ExpectArray<TestType>(values.data(), out.get());
+ }
+};
+
+typedef ::testing::Types<BooleanType, UInt8Type, Int8Type, UInt16Type, Int16Type,
+ UInt32Type, Int32Type, UInt64Type, Int64Type, FloatType,
+ DoubleType> PrimitiveTestTypes;
+
+TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes);
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) {
+ this->TestSingleColumnRequiredRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) {
+ this->TestSingleColumnRequiredTableRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) {
+ this->TestSingleColumnRequiredRead(4);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
+ this->TestSingleColumnRequiredTableRead(4);
}
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/parquet-schema-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-schema-test.cc b/cpp/src/arrow/parquet/parquet-schema-test.cc
index 8de7394..819cdd3 100644
--- a/cpp/src/arrow/parquet/parquet-schema-test.cc
+++ b/cpp/src/arrow/parquet/parquet-schema-test.cc
@@ -183,7 +183,9 @@ class TestConvertArrowSchema : public ::testing::Test {
Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) {
arrow_schema_ = std::make_shared<Schema>(fields);
- return ToParquetSchema(arrow_schema_.get(), &result_schema_);
+ std::shared_ptr<::parquet::WriterProperties> properties =
+ ::parquet::default_writer_properties();
+ return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_);
}
protected:
http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc
index 3b4882d..7b05665 100644
--- a/cpp/src/arrow/parquet/reader.cc
+++ b/cpp/src/arrow/parquet/reader.cc
@@ -17,6 +17,7 @@
#include "arrow/parquet/reader.h"
+#include <algorithm>
#include <queue>
#include <string>
#include <vector>
@@ -27,6 +28,7 @@
#include "arrow/schema.h"
#include "arrow/table.h"
#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
#include "arrow/util/status.h"
using parquet::ColumnReader;
@@ -36,6 +38,19 @@ using parquet::TypedColumnReader;
namespace arrow {
namespace parquet {
+template <typename ArrowType>
+struct ArrowTypeTraits {
+ typedef NumericBuilder<ArrowType> builder_type;
+};
+
+template <>
+struct ArrowTypeTraits<BooleanType> {
+ typedef BooleanBuilder builder_type;
+};
+
+template <typename ArrowType>
+using BuilderType = typename ArrowTypeTraits<ArrowType>::builder_type;
+
class FileReader::Impl {
public:
Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader);
@@ -61,9 +76,45 @@ class FlatColumnReader::Impl {
template <typename ArrowType, typename ParquetType>
Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
+ template <typename ArrowType, typename ParquetType>
+ Status ReadNullableFlatBatch(const int16_t* def_levels,
+ typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
+ BuilderType<ArrowType>* builder);
+ template <typename ArrowType, typename ParquetType>
+ Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read,
+ BuilderType<ArrowType>* builder);
+
private:
void NextRowGroup();
+ template <typename InType, typename OutType>
+ struct can_copy_ptr {
+ static constexpr bool value =
+ std::is_same<InType, OutType>::value ||
+ (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
+ (sizeof(InType) == sizeof(OutType)));
+ };
+
+ template <typename InType, typename OutType,
+ typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+ Status ConvertPhysicalType(
+ const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+ *out_ptr = reinterpret_cast<const OutType*>(in_ptr);
+ return Status::OK();
+ }
+
+ template <typename InType, typename OutType,
+ typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+ Status ConvertPhysicalType(
+ const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+ RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType)));
+ OutType* mutable_out_ptr =
+ reinterpret_cast<OutType*>(values_builder_buffer_.mutable_data());
+ std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
+ *out_ptr = mutable_out_ptr;
+ return Status::OK();
+ }
+
MemoryPool* pool_;
const ::parquet::ColumnDescriptor* descr_;
::parquet::ParquetFileReader* reader_;
@@ -156,12 +207,52 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor
}
template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::ReadNonNullableBatch(typename ParquetType::c_type* values,
+ int64_t values_read, BuilderType<ArrowType>* builder) {
+ using ArrowCType = typename ArrowType::c_type;
+ using ParquetCType = typename ParquetType::c_type;
+
+ DCHECK(builder);
+ const ArrowCType* values_ptr;
+ RETURN_NOT_OK(
+ (ConvertPhysicalType<ParquetCType, ArrowCType>(values, values_read, &values_ptr)));
+ RETURN_NOT_OK(builder->Append(values_ptr, values_read));
+ return Status::OK();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
+ typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
+ BuilderType<ArrowType>* builder) {
+ using ArrowCType = typename ArrowType::c_type;
+
+ DCHECK(builder);
+ RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * sizeof(ArrowCType)));
+ RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
+ auto values_ptr = reinterpret_cast<ArrowCType*>(values_builder_buffer_.mutable_data());
+ uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
+ int values_idx = 0;
+ for (int64_t i = 0; i < levels_read; i++) {
+ if (def_levels[i] < descr_->max_definition_level()) {
+ valid_bytes[i] = 0;
+ } else {
+ valid_bytes[i] = 1;
+ values_ptr[i] = values[values_idx++];
+ }
+ }
+ RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes));
+ return Status::OK();
+}
+
+template <typename ArrowType, typename ParquetType>
Status FlatColumnReader::Impl::TypedReadBatch(
int batch_size, std::shared_ptr<Array>* out) {
+ using ParquetCType = typename ParquetType::c_type;
+
int values_to_read = batch_size;
- NumericBuilder<ArrowType> builder(pool_, field_->type);
+ BuilderType<ArrowType> builder(pool_, field_->type);
while ((values_to_read > 0) && column_reader_) {
- values_buffer_.Resize(values_to_read * sizeof(typename ParquetType::c_type));
+ values_buffer_.Resize(values_to_read * sizeof(ParquetCType));
if (descr_->max_definition_level() > 0) {
def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
}
@@ -169,31 +260,62 @@ Status FlatColumnReader::Impl::TypedReadBatch(
int64_t values_read;
int64_t levels_read;
int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
- auto values =
- reinterpret_cast<typename ParquetType::c_type*>(values_buffer_.mutable_data());
+ auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
values_to_read, def_levels, nullptr, values, &values_read));
values_to_read -= levels_read;
if (descr_->max_definition_level() == 0) {
- RETURN_NOT_OK(builder.Append(values, values_read));
+ RETURN_NOT_OK(
+ (ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read, &builder)));
+ } else {
+ // As per the defintion and checks for flat columns:
+ // descr_->max_definition_level() == 1
+ RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>(
+ def_levels, values, values_read, levels_read, &builder)));
+ }
+ if (!column_reader_->HasNext()) { NextRowGroup(); }
+ }
+ *out = builder.Finish();
+ return Status::OK();
+}
+
+template <>
+Status FlatColumnReader::Impl::TypedReadBatch<StringType, ::parquet::ByteArrayType>(
+ int batch_size, std::shared_ptr<Array>* out) {
+ int values_to_read = batch_size;
+ StringBuilder builder(pool_, field_->type);
+ while ((values_to_read > 0) && column_reader_) {
+ values_buffer_.Resize(values_to_read * sizeof(::parquet::ByteArray));
+ if (descr_->max_definition_level() > 0) {
+ def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
+ }
+ auto reader =
+ dynamic_cast<TypedColumnReader<::parquet::ByteArrayType>*>(column_reader_.get());
+ int64_t values_read;
+ int64_t levels_read;
+ int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ auto values = reinterpret_cast<::parquet::ByteArray*>(values_buffer_.mutable_data());
+ PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+ values_to_read, def_levels, nullptr, values, &values_read));
+ values_to_read -= levels_read;
+ if (descr_->max_definition_level() == 0) {
+ for (int64_t i = 0; i < levels_read; i++) {
+ RETURN_NOT_OK(
+ builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len));
+ }
} else {
// descr_->max_definition_level() == 1
- RETURN_NOT_OK(values_builder_buffer_.Resize(
- levels_read * sizeof(typename ParquetType::c_type)));
- RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
- auto values_ptr = reinterpret_cast<typename ParquetType::c_type*>(
- values_builder_buffer_.mutable_data());
- uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
int values_idx = 0;
for (int64_t i = 0; i < levels_read; i++) {
if (def_levels[i] < descr_->max_definition_level()) {
- valid_bytes[i] = 0;
+ RETURN_NOT_OK(builder.AppendNull());
} else {
- valid_bytes[i] = 1;
- values_ptr[i] = values[values_idx++];
+ RETURN_NOT_OK(
+ builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr),
+ values[values_idx].len));
+ values_idx++;
}
}
- builder.Append(values_ptr, levels_read, valid_bytes);
}
if (!column_reader_->HasNext()) { NextRowGroup(); }
}
@@ -214,10 +336,18 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>*
}
switch (field_->type->type) {
+ TYPED_BATCH_CASE(BOOL, BooleanType, ::parquet::BooleanType)
+ TYPED_BATCH_CASE(UINT8, UInt8Type, ::parquet::Int32Type)
+ TYPED_BATCH_CASE(INT8, Int8Type, ::parquet::Int32Type)
+ TYPED_BATCH_CASE(UINT16, UInt16Type, ::parquet::Int32Type)
+ TYPED_BATCH_CASE(INT16, Int16Type, ::parquet::Int32Type)
+ TYPED_BATCH_CASE(UINT32, UInt32Type, ::parquet::Int32Type)
TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type)
+ TYPED_BATCH_CASE(UINT64, UInt64Type, ::parquet::Int64Type)
TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type)
TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType)
TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType)
+ TYPED_BATCH_CASE(STRING, StringType, ::parquet::ByteArrayType)
default:
return Status::NotImplemented(field_->type->ToString());
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/schema.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc
index c7979db..a79342a 100644
--- a/cpp/src/arrow/parquet/schema.cc
+++ b/cpp/src/arrow/parquet/schema.cc
@@ -42,7 +42,12 @@ namespace parquet {
const auto BOOL = std::make_shared<BooleanType>();
const auto UINT8 = std::make_shared<UInt8Type>();
+const auto INT8 = std::make_shared<Int8Type>();
+const auto UINT16 = std::make_shared<UInt16Type>();
+const auto INT16 = std::make_shared<Int16Type>();
+const auto UINT32 = std::make_shared<UInt32Type>();
const auto INT32 = std::make_shared<Int32Type>();
+const auto UINT64 = std::make_shared<UInt64Type>();
const auto INT64 = std::make_shared<Int64Type>();
const auto FLOAT = std::make_shared<FloatType>();
const auto DOUBLE = std::make_shared<DoubleType>();
@@ -92,6 +97,21 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
case LogicalType::NONE:
*out = INT32;
break;
+ case LogicalType::UINT_8:
+ *out = UINT8;
+ break;
+ case LogicalType::INT_8:
+ *out = INT8;
+ break;
+ case LogicalType::UINT_16:
+ *out = UINT16;
+ break;
+ case LogicalType::INT_16:
+ *out = INT16;
+ break;
+ case LogicalType::UINT_32:
+ *out = UINT32;
+ break;
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
break;
@@ -107,6 +127,9 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
case LogicalType::NONE:
*out = INT64;
break;
+ case LogicalType::UINT_64:
+ *out = UINT64;
+ break;
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
break;
@@ -187,20 +210,21 @@ Status FromParquetSchema(
}
Status StructToNode(const std::shared_ptr<StructType>& type, const std::string& name,
- bool nullable, NodePtr* out) {
+ bool nullable, const ::parquet::WriterProperties& properties, NodePtr* out) {
Repetition::type repetition = Repetition::REQUIRED;
if (nullable) { repetition = Repetition::OPTIONAL; }
std::vector<NodePtr> children(type->num_children());
for (int i = 0; i < type->num_children(); i++) {
- RETURN_NOT_OK(FieldToNode(type->child(i), &children[i]));
+ RETURN_NOT_OK(FieldToNode(type->child(i), properties, &children[i]));
}
*out = GroupNode::Make(name, repetition, children);
return Status::OK();
}
-Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
+Status FieldToNode(const std::shared_ptr<Field>& field,
+ const ::parquet::WriterProperties& properties, NodePtr* out) {
LogicalType::type logical_type = LogicalType::NONE;
ParquetType::type type;
Repetition::type repetition = Repetition::REQUIRED;
@@ -231,8 +255,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
logical_type = LogicalType::INT_16;
break;
case Type::UINT32:
- type = ParquetType::INT32;
- logical_type = LogicalType::UINT_32;
+ if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) {
+ type = ParquetType::INT64;
+ } else {
+ type = ParquetType::INT32;
+ logical_type = LogicalType::UINT_32;
+ }
break;
case Type::INT32:
type = ParquetType::INT32;
@@ -277,7 +305,7 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
break;
case Type::STRUCT: {
auto struct_type = std::static_pointer_cast<StructType>(field->type);
- return StructToNode(struct_type, field->name, field->nullable, out);
+ return StructToNode(struct_type, field->name, field->nullable, properties, out);
} break;
default:
// TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
@@ -287,11 +315,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
return Status::OK();
}
-Status ToParquetSchema(
- const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out) {
+Status ToParquetSchema(const Schema* arrow_schema,
+ const ::parquet::WriterProperties& properties,
+ std::shared_ptr<::parquet::SchemaDescriptor>* out) {
std::vector<NodePtr> nodes(arrow_schema->num_fields());
for (int i = 0; i < arrow_schema->num_fields(); i++) {
- RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), &nodes[i]));
+ RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i]));
}
NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/schema.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/schema.h b/cpp/src/arrow/parquet/schema.h
index ec5f960..39bee05 100644
--- a/cpp/src/arrow/parquet/schema.h
+++ b/cpp/src/arrow/parquet/schema.h
@@ -21,6 +21,7 @@
#include <memory>
#include "parquet/api/schema.h"
+#include "parquet/api/writer.h"
#include "arrow/schema.h"
#include "arrow/type.h"
@@ -36,10 +37,12 @@ Status NodeToField(const ::parquet::schema::NodePtr& node, std::shared_ptr<Field
Status FromParquetSchema(
const ::parquet::SchemaDescriptor* parquet_schema, std::shared_ptr<Schema>* out);
-Status FieldToNode(const std::shared_ptr<Field>& field, ::parquet::schema::NodePtr* out);
+Status FieldToNode(const std::shared_ptr<Field>& field,
+ const ::parquet::WriterProperties& properties, ::parquet::schema::NodePtr* out);
-Status ToParquetSchema(
- const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out);
+Status ToParquetSchema(const Schema* arrow_schema,
+ const ::parquet::WriterProperties& properties,
+ std::shared_ptr<::parquet::SchemaDescriptor>* out);
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/test-util.h b/cpp/src/arrow/parquet/test-util.h
index cc8723b..68a7fb9 100644
--- a/cpp/src/arrow/parquet/test-util.h
+++ b/cpp/src/arrow/parquet/test-util.h
@@ -18,26 +18,90 @@
#include <string>
#include <vector>
+#include "arrow/test-util.h"
#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
namespace arrow {
namespace parquet {
template <typename ArrowType>
-std::shared_ptr<PrimitiveArray> NonNullArray(
- size_t size, typename ArrowType::c_type value) {
- std::vector<typename ArrowType::c_type> values(size, value);
+using is_arrow_float = std::is_floating_point<typename ArrowType::c_type>;
+
+template <typename ArrowType>
+using is_arrow_int = std::is_integral<typename ArrowType::c_type>;
+
+template <typename ArrowType>
+using is_arrow_string = std::is_same<ArrowType, StringType>;
+
+template <class ArrowType>
+typename std::enable_if<is_arrow_float<ArrowType>::value,
+ std::shared_ptr<PrimitiveArray>>::type
+NonNullArray(size_t size) {
+ std::vector<typename ArrowType::c_type> values;
+ ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values);
NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
builder.Append(values.data(), values.size());
return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
}
-// This helper function only supports (size/2) nulls yet.
+template <class ArrowType>
+typename std::enable_if<is_arrow_int<ArrowType>::value,
+ std::shared_ptr<PrimitiveArray>>::type
+NonNullArray(size_t size) {
+ std::vector<typename ArrowType::c_type> values;
+ ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+ NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
+ builder.Append(values.data(), values.size());
+ return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+template <class ArrowType>
+typename std::enable_if<is_arrow_string<ArrowType>::value,
+ std::shared_ptr<StringArray>>::type
+NonNullArray(size_t size) {
+ StringBuilder builder(default_memory_pool(), std::make_shared<StringType>());
+ for (size_t i = 0; i < size; i++) {
+ builder.Append("test-string");
+ }
+ return std::static_pointer_cast<StringArray>(builder.Finish());
+}
+
+template <>
+std::shared_ptr<PrimitiveArray> NonNullArray<BooleanType>(size_t size) {
+ std::vector<uint8_t> values;
+ ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+ BooleanBuilder builder(default_memory_pool(), std::make_shared<BooleanType>());
+ builder.Append(values.data(), values.size());
+ return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls.
+template <typename ArrowType>
+typename std::enable_if<is_arrow_float<ArrowType>::value,
+ std::shared_ptr<PrimitiveArray>>::type
+NullableArray(size_t size, size_t num_nulls) {
+ std::vector<typename ArrowType::c_type> values;
+ ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values);
+ std::vector<uint8_t> valid_bytes(size, 1);
+
+ for (size_t i = 0; i < num_nulls; i++) {
+ valid_bytes[i * 2] = 0;
+ }
+
+ NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
+ builder.Append(values.data(), values.size(), valid_bytes.data());
+ return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls.
template <typename ArrowType>
-std::shared_ptr<PrimitiveArray> NullableArray(
- size_t size, typename ArrowType::c_type value, size_t num_nulls) {
- std::vector<typename ArrowType::c_type> values(size, value);
+typename std::enable_if<is_arrow_int<ArrowType>::value,
+ std::shared_ptr<PrimitiveArray>>::type
+NullableArray(size_t size, size_t num_nulls) {
+ std::vector<typename ArrowType::c_type> values;
+ ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
std::vector<uint8_t> valid_bytes(size, 1);
for (size_t i = 0; i < num_nulls; i++) {
@@ -49,14 +113,49 @@ std::shared_ptr<PrimitiveArray> NullableArray(
return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
}
-std::shared_ptr<Column> MakeColumn(const std::string& name,
- const std::shared_ptr<PrimitiveArray>& array, bool nullable) {
+// This helper function only supports (size/2) nulls yet.
+template <typename ArrowType>
+typename std::enable_if<is_arrow_string<ArrowType>::value,
+ std::shared_ptr<StringArray>>::type
+NullableArray(size_t size, size_t num_nulls) {
+ std::vector<uint8_t> valid_bytes(size, 1);
+
+ for (size_t i = 0; i < num_nulls; i++) {
+ valid_bytes[i * 2] = 0;
+ }
+
+ StringBuilder builder(default_memory_pool(), std::make_shared<StringType>());
+ for (size_t i = 0; i < size; i++) {
+ builder.Append("test-string");
+ }
+ return std::static_pointer_cast<StringArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls yet.
+template <>
+std::shared_ptr<PrimitiveArray> NullableArray<BooleanType>(
+ size_t size, size_t num_nulls) {
+ std::vector<uint8_t> values;
+ ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+ std::vector<uint8_t> valid_bytes(size, 1);
+
+ for (size_t i = 0; i < num_nulls; i++) {
+ valid_bytes[i * 2] = 0;
+ }
+
+ BooleanBuilder builder(default_memory_pool(), std::make_shared<BooleanType>());
+ builder.Append(values.data(), values.size(), valid_bytes.data());
+ return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+std::shared_ptr<Column> MakeColumn(
+ const std::string& name, const std::shared_ptr<Array>& array, bool nullable) {
auto field = std::make_shared<Field>(name, array->type(), nullable);
return std::make_shared<Column>(field, array);
}
std::shared_ptr<Table> MakeSimpleTable(
- const std::shared_ptr<PrimitiveArray>& values, bool nullable) {
+ const std::shared_ptr<Array>& values, bool nullable) {
std::shared_ptr<Column> column = MakeColumn("col", values, nullable);
std::vector<std::shared_ptr<Column>> columns({column});
std::vector<std::shared_ptr<Field>> fields({column->field()});
@@ -72,6 +171,23 @@ void ExpectArray(T* expected, Array* result) {
}
}
+template <typename ArrowType>
+void ExpectArray(typename ArrowType::c_type* expected, Array* result) {
+ PrimitiveArray* p_array = static_cast<PrimitiveArray*>(result);
+ for (int64_t i = 0; i < result->length(); i++) {
+ EXPECT_EQ(expected[i],
+ reinterpret_cast<const typename ArrowType::c_type*>(p_array->data()->data())[i]);
+ }
+}
+
+template <>
+void ExpectArray<BooleanType>(uint8_t* expected, Array* result) {
+ BooleanBuilder builder(default_memory_pool(), std::make_shared<BooleanType>());
+ builder.Append(expected, result->length());
+ std::shared_ptr<Array> expected_array = builder.Finish();
+ EXPECT_TRUE(result->Equals(expected_array));
+}
+
} // namespace parquet
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc
index 4005e3b..63449bb 100644
--- a/cpp/src/arrow/parquet/writer.cc
+++ b/cpp/src/arrow/parquet/writer.cc
@@ -25,11 +25,13 @@
#include "arrow/table.h"
#include "arrow/types/construct.h"
#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
#include "arrow/parquet/schema.h"
#include "arrow/parquet/utils.h"
#include "arrow/util/status.h"
using parquet::ParquetFileWriter;
+using parquet::ParquetVersion;
using parquet::schema::GroupNode;
namespace arrow {
@@ -41,10 +43,40 @@ class FileWriter::Impl {
Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer);
Status NewRowGroup(int64_t chunk_size);
- template <typename ParquetType>
+ template <typename ParquetType, typename ArrowType>
Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data,
int64_t offset, int64_t length);
+
+ // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary
+ // buffer
+ template <typename InType, typename OutType>
+ struct can_copy_ptr {
+ static constexpr bool value =
+ std::is_same<InType, OutType>::value ||
+ (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
+ (sizeof(InType) == sizeof(OutType)));
+ };
+
+ template <typename InType, typename OutType,
+ typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+ Status ConvertPhysicalType(const InType* in_ptr, int64_t, const OutType** out_ptr) {
+ *out_ptr = reinterpret_cast<const OutType*>(in_ptr);
+ return Status::OK();
+ }
+
+ template <typename InType, typename OutType,
+ typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+ Status ConvertPhysicalType(
+ const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+ RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(OutType)));
+ OutType* mutable_out_ptr = reinterpret_cast<OutType*>(data_buffer_.mutable_data());
+ std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
+ *out_ptr = mutable_out_ptr;
+ return Status::OK();
+ }
+
Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length);
+ Status WriteFlatColumnChunk(const StringArray* data, int64_t offset, int64_t length);
Status Close();
virtual ~Impl() {}
@@ -53,6 +85,8 @@ class FileWriter::Impl {
friend class FileWriter;
MemoryPool* pool_;
+ // Buffer used for storing the data of an array converted to the physical type
+ // as expected by parquet-cpp.
PoolBuffer data_buffer_;
PoolBuffer def_levels_buffer_;
std::unique_ptr<::parquet::ParquetFileWriter> writer_;
@@ -72,36 +106,95 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
return Status::OK();
}
-template <typename ParquetType>
+template <typename ParquetType, typename ArrowType>
Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer,
const PrimitiveArray* data, int64_t offset, int64_t length) {
- // TODO: DCHECK((offset + length) <= data->length());
- auto data_ptr =
- reinterpret_cast<const typename ParquetType::c_type*>(data->data()->data()) +
- offset;
+ using ArrowCType = typename ArrowType::c_type;
+ using ParquetCType = typename ParquetType::c_type;
+
+ DCHECK((offset + length) <= data->length());
+ auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data()) + offset;
auto writer =
reinterpret_cast<::parquet::TypedColumnWriter<ParquetType>*>(column_writer);
if (writer->descr()->max_definition_level() == 0) {
// no nulls, just dump the data
- PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_ptr));
+ const ParquetCType* data_writer_ptr;
+ RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
+ data_ptr, length, &data_writer_ptr)));
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_writer_ptr));
} else if (writer->descr()->max_definition_level() == 1) {
RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
int16_t* def_levels_ptr =
reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
if (data->null_count() == 0) {
std::fill(def_levels_ptr, def_levels_ptr + length, 1);
- PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, data_ptr));
+ const ParquetCType* data_writer_ptr;
+ RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
+ data_ptr, length, &data_writer_ptr)));
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(length, def_levels_ptr, nullptr, data_writer_ptr));
} else {
- RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(typename ParquetType::c_type)));
- auto buffer_ptr =
- reinterpret_cast<typename ParquetType::c_type*>(data_buffer_.mutable_data());
+ RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType)));
+ auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
int buffer_idx = 0;
for (int i = 0; i < length; i++) {
if (data->IsNull(offset + i)) {
def_levels_ptr[i] = 0;
} else {
def_levels_ptr[i] = 1;
- buffer_ptr[buffer_idx++] = data_ptr[i];
+ buffer_ptr[buffer_idx++] = static_cast<ParquetCType>(data_ptr[i]);
+ }
+ }
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+ }
+ } else {
+ return Status::NotImplemented("no support for max definition level > 1 yet");
+ }
+ PARQUET_CATCH_NOT_OK(writer->Close());
+ return Status::OK();
+}
+
+// This specialization seems quite similar but it significantly differs in two points:
+// * offset is added at the most latest time to the pointer as we have sub-byte access
+// * Arrow data is stored bitwise thus we cannot use std::copy to transform from
+// ArrowType::c_type to ParquetType::c_type
+template <>
+Status FileWriter::Impl::TypedWriteBatch<::parquet::BooleanType, BooleanType>(
+ ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset,
+ int64_t length) {
+ DCHECK((offset + length) <= data->length());
+ RETURN_NOT_OK(data_buffer_.Resize(length));
+ auto data_ptr = reinterpret_cast<const uint8_t*>(data->data()->data());
+ auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data());
+ auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::BooleanType>*>(
+ column_writer);
+ if (writer->descr()->max_definition_level() == 0) {
+ // no nulls, just dump the data
+ for (int64_t i = 0; i < length; i++) {
+ buffer_ptr[i] = util::get_bit(data_ptr, offset + i);
+ }
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, buffer_ptr));
+ } else if (writer->descr()->max_definition_level() == 1) {
+ RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
+ int16_t* def_levels_ptr =
+ reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ if (data->null_count() == 0) {
+ std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+ for (int64_t i = 0; i < length; i++) {
+ buffer_ptr[i] = util::get_bit(data_ptr, offset + i);
+ }
+ // TODO(PARQUET-644): write boolean values as a packed bitmap
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+ } else {
+ int buffer_idx = 0;
+ for (int i = 0; i < length; i++) {
+ if (data->IsNull(offset + i)) {
+ def_levels_ptr[i] = 0;
+ } else {
+ def_levels_ptr[i] = 1;
+ buffer_ptr[buffer_idx++] = util::get_bit(data_ptr, offset + i);
}
}
PARQUET_CATCH_NOT_OK(
@@ -120,9 +213,9 @@ Status FileWriter::Impl::Close() {
return Status::OK();
}
-#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
- case Type::ENUM: \
- return TypedWriteBatch<ParquetType>(writer, data, offset, length); \
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
+ case Type::ENUM: \
+ return TypedWriteBatch<ParquetType, ArrowType>(writer, data, offset, length); \
break;
Status FileWriter::Impl::WriteFlatColumnChunk(
@@ -130,15 +223,76 @@ Status FileWriter::Impl::WriteFlatColumnChunk(
::parquet::ColumnWriter* writer;
PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn());
switch (data->type_enum()) {
- TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type)
- TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type)
- TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType)
- TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType)
+ TYPED_BATCH_CASE(BOOL, BooleanType, ::parquet::BooleanType)
+ TYPED_BATCH_CASE(UINT8, UInt8Type, ::parquet::Int32Type)
+ TYPED_BATCH_CASE(INT8, Int8Type, ::parquet::Int32Type)
+ TYPED_BATCH_CASE(UINT16, UInt16Type, ::parquet::Int32Type)
+ TYPED_BATCH_CASE(INT16, Int16Type, ::parquet::Int32Type)
+ case Type::UINT32:
+ if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
+ // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
+ // to use the larger Int64Type to store them lossless.
+ return TypedWriteBatch<::parquet::Int64Type, UInt32Type>(
+ writer, data, offset, length);
+ } else {
+ return TypedWriteBatch<::parquet::Int32Type, UInt32Type>(
+ writer, data, offset, length);
+ }
+ TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type)
+ TYPED_BATCH_CASE(UINT64, UInt64Type, ::parquet::Int64Type)
+ TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type)
+ TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType)
+ TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType)
default:
return Status::NotImplemented(data->type()->ToString());
}
}
+Status FileWriter::Impl::WriteFlatColumnChunk(
+ const StringArray* data, int64_t offset, int64_t length) {
+ ::parquet::ColumnWriter* column_writer;
+ PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
+ DCHECK((offset + length) <= data->length());
+ RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(::parquet::ByteArray)));
+ auto buffer_ptr = reinterpret_cast<::parquet::ByteArray*>(data_buffer_.mutable_data());
+ auto values = std::dynamic_pointer_cast<PrimitiveArray>(data->values());
+ auto data_ptr = reinterpret_cast<const uint8_t*>(values->data()->data());
+ DCHECK(values != nullptr);
+ auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::ByteArrayType>*>(
+ column_writer);
+ if (writer->descr()->max_definition_level() > 0) {
+ RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
+ }
+ int16_t* def_levels_ptr = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ if (writer->descr()->max_definition_level() == 0 || data->null_count() == 0) {
+ // no nulls, just dump the data
+ for (int64_t i = 0; i < length; i++) {
+ buffer_ptr[i] = ::parquet::ByteArray(
+ data->value_length(i + offset), data_ptr + data->value_offset(i));
+ }
+ if (writer->descr()->max_definition_level() > 0) {
+ std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+ }
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+ } else if (writer->descr()->max_definition_level() == 1) {
+ int buffer_idx = 0;
+ for (int64_t i = 0; i < length; i++) {
+ if (data->IsNull(offset + i)) {
+ def_levels_ptr[i] = 0;
+ } else {
+ def_levels_ptr[i] = 1;
+ buffer_ptr[buffer_idx++] = ::parquet::ByteArray(
+ data->value_length(i + offset), data_ptr + data->value_offset(i + offset));
+ }
+ }
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+ } else {
+ return Status::NotImplemented("no support for max definition level > 1 yet");
+ }
+ PARQUET_CATCH_NOT_OK(writer->Close());
+ return Status::OK();
+}
+
FileWriter::FileWriter(
MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer)
: impl_(new FileWriter::Impl(pool, std::move(writer))) {}
@@ -148,10 +302,20 @@ Status FileWriter::NewRowGroup(int64_t chunk_size) {
}
Status FileWriter::WriteFlatColumnChunk(
- const PrimitiveArray* data, int64_t offset, int64_t length) {
+ const Array* array, int64_t offset, int64_t length) {
int64_t real_length = length;
- if (length == -1) { real_length = data->length(); }
- return impl_->WriteFlatColumnChunk(data, offset, real_length);
+ if (length == -1) { real_length = array->length(); }
+ if (array->type_enum() == Type::STRING) {
+ auto string_array = dynamic_cast<const StringArray*>(array);
+ DCHECK(string_array);
+ return impl_->WriteFlatColumnChunk(string_array, offset, real_length);
+ } else {
+ auto primitive_array = dynamic_cast<const PrimitiveArray*>(array);
+ if (!primitive_array) {
+ return Status::NotImplemented("Table must consist of PrimitiveArray instances");
+ }
+ return impl_->WriteFlatColumnChunk(primitive_array, offset, real_length);
+ }
}
Status FileWriter::Close() {
@@ -165,40 +329,30 @@ MemoryPool* FileWriter::memory_pool() const {
FileWriter::~FileWriter() {}
Status WriteFlatTable(const Table* table, MemoryPool* pool,
- std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size) {
+ const std::shared_ptr<::parquet::OutputStream>& sink, int64_t chunk_size,
+ const std::shared_ptr<::parquet::WriterProperties>& properties) {
std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema;
- RETURN_NOT_OK(ToParquetSchema(table->schema().get(), &parquet_schema));
+ RETURN_NOT_OK(
+ ToParquetSchema(table->schema().get(), *properties.get(), &parquet_schema));
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema());
std::unique_ptr<ParquetFileWriter> parquet_writer =
- ParquetFileWriter::Open(sink, schema_node);
+ ParquetFileWriter::Open(sink, schema_node, properties);
FileWriter writer(pool, std::move(parquet_writer));
- // TODO: Support writing chunked arrays.
+ // TODO(ARROW-232) Support writing chunked arrays.
for (int i = 0; i < table->num_columns(); i++) {
if (table->column(i)->data()->num_chunks() != 1) {
return Status::NotImplemented("No support for writing chunked arrays yet.");
}
}
- // Cast to PrimitiveArray instances as we work with them.
- std::vector<std::shared_ptr<PrimitiveArray>> arrays(table->num_columns());
- for (int i = 0; i < table->num_columns(); i++) {
- // num_chunks == 1 as per above loop
- std::shared_ptr<Array> array = table->column(i)->data()->chunk(0);
- auto primitive_array = std::dynamic_pointer_cast<PrimitiveArray>(array);
- if (!primitive_array) {
- PARQUET_IGNORE_NOT_OK(writer.Close());
- return Status::NotImplemented("Table must consist of PrimitiveArray instances");
- }
- arrays[i] = primitive_array;
- }
-
for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) {
int64_t offset = chunk * chunk_size;
int64_t size = std::min(chunk_size, table->num_rows() - offset);
RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close()));
for (int i = 0; i < table->num_columns(); i++) {
- RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(arrays[i].get(), offset, size),
+ std::shared_ptr<Array> array = table->column(i)->data()->chunk(0);
+ RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(array.get(), offset, size),
PARQUET_IGNORE_NOT_OK(writer.Close()));
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h
index 93693f5..cfd80d8 100644
--- a/cpp/src/arrow/parquet/writer.h
+++ b/cpp/src/arrow/parquet/writer.h
@@ -25,10 +25,12 @@
namespace arrow {
+class Array;
class MemoryPool;
class PrimitiveArray;
class RowBatch;
class Status;
+class StringArray;
class Table;
namespace parquet {
@@ -43,8 +45,7 @@ class FileWriter {
FileWriter(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer);
Status NewRowGroup(int64_t chunk_size);
- Status WriteFlatColumnChunk(
- const PrimitiveArray* data, int64_t offset = 0, int64_t length = -1);
+ Status WriteFlatColumnChunk(const Array* data, int64_t offset = 0, int64_t length = -1);
Status Close();
virtual ~FileWriter();
@@ -62,7 +63,9 @@ class FileWriter {
* The table shall only consist of nullable, non-repeated columns of primitive type.
*/
Status WriteFlatTable(const Table* table, MemoryPool* pool,
- std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size);
+ const std::shared_ptr<::parquet::OutputStream>& sink, int64_t chunk_size,
+ const std::shared_ptr<::parquet::WriterProperties>& properties =
+ ::parquet::default_writer_properties());
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 2f81161..055dac7 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -50,6 +50,8 @@
if (!s.ok()) { FAIL() << s.ToString(); } \
} while (0)
+#define ASSERT_OK_NO_THROW(expr) ASSERT_NO_THROW(ASSERT_OK(expr))
+
#define EXPECT_OK(expr) \
do { \
Status s = (expr); \
http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/types/primitive.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive.cc b/cpp/src/arrow/types/primitive.cc
index 08fc847..f4b47f9 100644
--- a/cpp/src/arrow/types/primitive.cc
+++ b/cpp/src/arrow/types/primitive.cc
@@ -133,6 +133,11 @@ Status PrimitiveBuilder<BooleanType>::Append(
RETURN_NOT_OK(Reserve(length));
for (int i = 0; i < length; ++i) {
+ // Skip reading from unitialised memory
+ // TODO: This actually is only to keep valgrind happy but may or may not
+ // have a performance impact.
+ if ((valid_bytes != nullptr) && !valid_bytes[i]) continue;
+
if (values[i] > 0) {
util::set_bit(raw_data_, length_ + i);
} else {
http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/python/pyarrow/includes/parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd
index 0918344..a2f83ea 100644
--- a/python/pyarrow/includes/parquet.pxd
+++ b/python/pyarrow/includes/parquet.pxd
@@ -32,6 +32,10 @@ cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
pass
cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
+ enum ParquetVersion" parquet::ParquetVersion::type":
+ PARQUET_1_0" parquet::ParquetVersion::PARQUET_1_0"
+ PARQUET_2_0" parquet::ParquetVersion::PARQUET_2_0"
+
cdef cppclass SchemaDescriptor:
shared_ptr[Node] schema()
GroupNode* group()
@@ -80,6 +84,11 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
LocalFileOutputStream(const c_string& path)
void Close()
+ cdef cppclass WriterProperties:
+ cppclass Builder:
+ Builder* version(ParquetVersion version)
+ shared_ptr[WriterProperties] build()
+
cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil:
cdef cppclass FileReader:
@@ -93,5 +102,7 @@ cdef extern from "arrow/parquet/schema.h" namespace "arrow::parquet" nogil:
cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil:
- cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool, shared_ptr[OutputStream] sink, int64_t chunk_size)
+ cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool,
+ const shared_ptr[OutputStream]& sink, int64_t chunk_size,
+ const shared_ptr[WriterProperties]& properties)
http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/python/pyarrow/parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx
index 3d5355e..0b2b208 100644
--- a/python/pyarrow/parquet.pyx
+++ b/python/pyarrow/parquet.pyx
@@ -24,6 +24,7 @@ cimport pyarrow.includes.pyarrow as pyarrow
from pyarrow.includes.parquet cimport *
from pyarrow.compat import tobytes
+from pyarrow.error import ArrowException
from pyarrow.error cimport check_cstatus
from pyarrow.table cimport Table
@@ -42,11 +43,13 @@ def read_table(filename, columns=None):
# in Cython (due to missing rvalue support)
reader = unique_ptr[FileReader](new FileReader(default_memory_pool(),
ParquetFileReader.OpenFile(tobytes(filename))))
- check_cstatus(reader.get().ReadFlatTable(&ctable))
+ with nogil:
+ check_cstatus(reader.get().ReadFlatTable(&ctable))
+
table.init(ctable)
return table
-def write_table(table, filename, chunk_size=None):
+def write_table(table, filename, chunk_size=None, version=None):
"""
Write a Table to Parquet format
@@ -56,16 +59,29 @@ def write_table(table, filename, chunk_size=None):
filename : string
chunk_size : int
The maximum number of rows in each Parquet RowGroup
+ version : {"1.0", "2.0"}, default "1.0"
+ The Parquet format version, defaults to 1.0
"""
cdef Table table_ = table
cdef CTable* ctable_ = table_.table
cdef shared_ptr[OutputStream] sink
+ cdef WriterProperties.Builder properties_builder
cdef int64_t chunk_size_ = 0
if chunk_size is None:
chunk_size_ = min(ctable_.num_rows(), int(2**16))
else:
chunk_size_ = chunk_size
+ if version is not None:
+ if version == "1.0":
+ properties_builder.version(PARQUET_1_0)
+ elif version == "2.0":
+ properties_builder.version(PARQUET_2_0)
+ else:
+ raise ArrowException("Unsupported Parquet format version")
+
sink.reset(new LocalFileOutputStream(tobytes(filename)))
- check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink, chunk_size_))
+ with nogil:
+ check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink,
+ chunk_size_, properties_builder.build()))
http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index d92cf4c..de9cfbb 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -42,18 +42,55 @@ def test_single_pylist_column_roundtrip(tmpdir):
data_read = col_read.data.chunk(0)
assert data_written.equals(data_read)
-def test_pandas_rountrip(tmpdir):
+def test_pandas_parquet_2_0_rountrip(tmpdir):
size = 10000
+ np.random.seed(0)
df = pd.DataFrame({
+ 'uint8': np.arange(size, dtype=np.uint8),
+ 'uint16': np.arange(size, dtype=np.uint16),
+ 'uint32': np.arange(size, dtype=np.uint32),
+ 'uint64': np.arange(size, dtype=np.uint64),
+ 'int8': np.arange(size, dtype=np.int16),
+ 'int16': np.arange(size, dtype=np.int16),
'int32': np.arange(size, dtype=np.int32),
'int64': np.arange(size, dtype=np.int64),
'float32': np.arange(size, dtype=np.float32),
- 'float64': np.arange(size, dtype=np.float64)
+ 'float64': np.arange(size, dtype=np.float64),
+ 'bool': np.random.randn(size) > 0,
+ 'str': [str(x) for x in range(size)],
+ 'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None]
})
filename = tmpdir.join('pandas_rountrip.parquet')
arrow_table = A.from_pandas_dataframe(df)
- A.parquet.write_table(arrow_table, filename.strpath)
+ A.parquet.write_table(arrow_table, filename.strpath, version="2.0")
table_read = pyarrow.parquet.read_table(filename.strpath)
df_read = table_read.to_pandas()
pdt.assert_frame_equal(df, df_read)
+def test_pandas_parquet_1_0_rountrip(tmpdir):
+ size = 10000
+ np.random.seed(0)
+ df = pd.DataFrame({
+ 'uint8': np.arange(size, dtype=np.uint8),
+ 'uint16': np.arange(size, dtype=np.uint16),
+ 'uint32': np.arange(size, dtype=np.uint32),
+ 'uint64': np.arange(size, dtype=np.uint64),
+ 'int8': np.arange(size, dtype=np.int16),
+ 'int16': np.arange(size, dtype=np.int16),
+ 'int32': np.arange(size, dtype=np.int32),
+ 'int64': np.arange(size, dtype=np.int64),
+ 'float32': np.arange(size, dtype=np.float32),
+ 'float64': np.arange(size, dtype=np.float64),
+ 'bool': np.random.randn(size) > 0
+ })
+ filename = tmpdir.join('pandas_rountrip.parquet')
+ arrow_table = A.from_pandas_dataframe(df)
+ A.parquet.write_table(arrow_table, filename.strpath, version="1.0")
+ table_read = pyarrow.parquet.read_table(filename.strpath)
+ df_read = table_read.to_pandas()
+
+ # We pass uint32_t as int64_t if we write Parquet version 1.0
+ df['uint32'] = df['uint32'].values.astype(np.int64)
+
+ pdt.assert_frame_equal(df, df_read)
+