You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/05/18 17:51:08 UTC
arrow git commit: ARROW-201: [C++] Initial ParquetWriter
implementation
Repository: arrow
Updated Branches:
refs/heads/master 978de1a94 -> e0fb3698e
ARROW-201: [C++] Initial ParquetWriter implementation
Author: Uwe L. Korn <uw...@xhochy.com>
Closes #78 from xhochy/arrow-201 and squashes the following commits:
5d95099 [Uwe L. Korn] Add check for flat column
88ae3ca [Uwe L. Korn] Install arrow_parquet headers
f81021b [Uwe L. Korn] Incorporate reader comments
ba240e8 [Uwe L. Korn] Incorporate writer comments
2179c0e [Uwe L. Korn] Infer c-type from ArrowType
efd46fb [Uwe L. Korn] Infer c-type from ArrowType
77386ea [Uwe L. Korn] Templatize test functions
1aa7698 [Uwe L. Korn] Add comment to helper function
8fdd4c8 [Uwe L. Korn] Parameterize schema creation
8e8d7d7 [Uwe L. Korn] ARROW-201: [C++] Initial ParquetWriter implementation
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e0fb3698
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e0fb3698
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e0fb3698
Branch: refs/heads/master
Commit: e0fb3698e5602bccaee232d4c259b3df089886e6
Parents: 978de1a
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Wed May 18 10:49:04 2016 -0700
Committer: Wes McKinney <we...@apache.org>
Committed: Wed May 18 10:49:04 2016 -0700
----------------------------------------------------------------------
cpp/src/arrow/parquet/CMakeLists.txt | 6 +-
cpp/src/arrow/parquet/parquet-io-test.cc | 222 ++++++++++++++++++++++
cpp/src/arrow/parquet/parquet-reader-test.cc | 116 -----------
cpp/src/arrow/parquet/reader.cc | 79 +++++---
cpp/src/arrow/parquet/writer.cc | 148 +++++++++++++++
cpp/src/arrow/parquet/writer.h | 59 ++++++
6 files changed, 485 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/e0fb3698/cpp/src/arrow/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt
index cd6f05d..c00cc9f 100644
--- a/cpp/src/arrow/parquet/CMakeLists.txt
+++ b/cpp/src/arrow/parquet/CMakeLists.txt
@@ -21,6 +21,7 @@
set(PARQUET_SRCS
reader.cc
schema.cc
+ writer.cc
)
set(PARQUET_LIBS
@@ -37,14 +38,15 @@ SET_TARGET_PROPERTIES(arrow_parquet PROPERTIES LINKER_LANGUAGE CXX)
ADD_ARROW_TEST(parquet-schema-test)
ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet)
-ADD_ARROW_TEST(parquet-reader-test)
-ARROW_TEST_LINK_LIBRARIES(parquet-reader-test arrow_parquet)
+ADD_ARROW_TEST(parquet-io-test)
+ARROW_TEST_LINK_LIBRARIES(parquet-io-test arrow_parquet)
# Headers: top level
install(FILES
reader.h
schema.h
utils.h
+ writer.h
DESTINATION include/arrow/parquet)
install(TARGETS arrow_parquet
http://git-wip-us.apache.org/repos/asf/arrow/blob/e0fb3698/cpp/src/arrow/parquet/parquet-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc
new file mode 100644
index 0000000..845574d
--- /dev/null
+++ b/cpp/src/arrow/parquet/parquet-io-test.cc
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include "arrow/test-util.h"
+#include "arrow/parquet/reader.h"
+#include "arrow/parquet/writer.h"
+#include "arrow/types/primitive.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+#include "parquet/api/reader.h"
+#include "parquet/api/writer.h"
+
+using ParquetBuffer = parquet::Buffer;
+using parquet::BufferReader;
+using parquet::InMemoryOutputStream;
+using parquet::ParquetFileReader;
+using parquet::ParquetFileWriter;
+using parquet::RandomAccessSource;
+using parquet::Repetition;
+using parquet::SchemaDescriptor;
+using ParquetType = parquet::Type;
+using parquet::schema::GroupNode;
+using parquet::schema::NodePtr;
+using parquet::schema::PrimitiveNode;
+
+namespace arrow {
+
+namespace parquet {
+
+template <typename ArrowType>
+std::shared_ptr<PrimitiveArray> NonNullArray(
+ size_t size, typename ArrowType::c_type value) {
+ std::vector<typename ArrowType::c_type> values(size, value);
+ NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
+ builder.Append(values.data(), values.size());
+ return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls yet.
+template <typename ArrowType>
+std::shared_ptr<PrimitiveArray> NullableArray(
+ size_t size, typename ArrowType::c_type value, size_t num_nulls) {
+ std::vector<typename ArrowType::c_type> values(size, value);
+ std::vector<uint8_t> valid_bytes(size, 1);
+
+ for (size_t i = 0; i < num_nulls; i++) {
+ valid_bytes[i * 2] = 0;
+ }
+
+ NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
+ builder.Append(values.data(), values.size(), valid_bytes.data());
+ return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+class TestParquetIO : public ::testing::Test {
+ public:
+ virtual void SetUp() {}
+
+ std::shared_ptr<GroupNode> Schema(
+ ParquetType::type parquet_type, Repetition::type repetition) {
+ auto pnode = PrimitiveNode::Make("column1", repetition, parquet_type);
+ NodePtr node_ =
+ GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
+ return std::static_pointer_cast<GroupNode>(node_);
+ }
+
+ std::unique_ptr<ParquetFileWriter> MakeWriter(std::shared_ptr<GroupNode>& schema) {
+ sink_ = std::make_shared<InMemoryOutputStream>();
+ return ParquetFileWriter::Open(sink_, schema);
+ }
+
+ std::unique_ptr<ParquetFileReader> ReaderFromSink() {
+ std::shared_ptr<ParquetBuffer> buffer = sink_->GetBuffer();
+ std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
+ return ParquetFileReader::Open(std::move(source));
+ }
+
+ void ReadSingleColumnFile(
+ std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Array>* out) {
+ arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
+ std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
+ ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader)));
+ ASSERT_NE(nullptr, column_reader.get());
+ ASSERT_OK(column_reader->NextBatch(100, out));
+ ASSERT_NE(nullptr, out->get());
+ }
+
+ std::unique_ptr<ParquetFileReader> Int64File(
+ std::vector<int64_t>& values, int num_chunks) {
+ std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED);
+ std::unique_ptr<ParquetFileWriter> file_writer = MakeWriter(schema);
+ size_t chunk_size = values.size() / num_chunks;
+ for (int i = 0; i < num_chunks; i++) {
+ auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
+ auto column_writer =
+ static_cast<::parquet::Int64Writer*>(row_group_writer->NextColumn());
+ int64_t* data = values.data() + i * chunk_size;
+ column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
+ column_writer->Close();
+ row_group_writer->Close();
+ }
+ file_writer->Close();
+ return ReaderFromSink();
+ }
+
+ private:
+ std::shared_ptr<InMemoryOutputStream> sink_;
+};
+
+TEST_F(TestParquetIO, SingleColumnInt64Read) {
+ std::vector<int64_t> values(100, 128);
+ std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 1);
+
+ std::shared_ptr<Array> out;
+ ReadSingleColumnFile(std::move(file_reader), &out);
+
+ Int64Array* out_array = static_cast<Int64Array*>(out.get());
+ for (size_t i = 0; i < values.size(); i++) {
+ EXPECT_EQ(values[i], out_array->raw_data()[i]);
+ }
+}
+
+TEST_F(TestParquetIO, SingleColumnInt64ChunkedRead) {
+ std::vector<int64_t> values(100, 128);
+ std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 4);
+
+ std::shared_ptr<Array> out;
+ ReadSingleColumnFile(std::move(file_reader), &out);
+
+ Int64Array* out_array = static_cast<Int64Array*>(out.get());
+ for (size_t i = 0; i < values.size(); i++) {
+ EXPECT_EQ(values[i], out_array->raw_data()[i]);
+ }
+}
+
+TEST_F(TestParquetIO, SingleColumnInt64Write) {
+ std::shared_ptr<PrimitiveArray> values = NonNullArray<Int64Type>(100, 128);
+
+ std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED);
+ FileWriter writer(default_memory_pool(), MakeWriter(schema));
+ ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length())));
+ ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get())));
+ ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+
+ std::shared_ptr<Array> out;
+ ReadSingleColumnFile(ReaderFromSink(), &out);
+ ASSERT_TRUE(values->Equals(out));
+}
+
+TEST_F(TestParquetIO, SingleColumnDoubleReadWrite) {
+ // This also tests max_definition_level = 1
+ std::shared_ptr<PrimitiveArray> values = NullableArray<DoubleType>(100, 128, 10);
+
+ std::shared_ptr<GroupNode> schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL);
+ FileWriter writer(default_memory_pool(), MakeWriter(schema));
+ ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length())));
+ ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get())));
+ ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+
+ std::shared_ptr<Array> out;
+ ReadSingleColumnFile(ReaderFromSink(), &out);
+ ASSERT_TRUE(values->Equals(out));
+}
+
+TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) {
+ std::shared_ptr<PrimitiveArray> values = NonNullArray<Int64Type>(100, 128);
+ std::shared_ptr<PrimitiveArray> values_chunk = NonNullArray<Int64Type>(25, 128);
+
+ std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED);
+ FileWriter writer(default_memory_pool(), MakeWriter(schema));
+ for (int i = 0; i < 4; i++) {
+ ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length())));
+ ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get())));
+ }
+ ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+
+ std::shared_ptr<Array> out;
+ ReadSingleColumnFile(ReaderFromSink(), &out);
+ ASSERT_TRUE(values->Equals(out));
+}
+
+TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) {
+ std::shared_ptr<PrimitiveArray> values = NullableArray<DoubleType>(100, 128, 10);
+ std::shared_ptr<PrimitiveArray> values_chunk_nulls =
+ NullableArray<DoubleType>(25, 128, 10);
+ std::shared_ptr<PrimitiveArray> values_chunk = NullableArray<DoubleType>(25, 128, 0);
+
+ std::shared_ptr<GroupNode> schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL);
+ FileWriter writer(default_memory_pool(), MakeWriter(schema));
+ ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk_nulls->length())));
+ ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk_nulls.get())));
+ for (int i = 0; i < 3; i++) {
+ ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length())));
+ ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get())));
+ }
+ ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+
+ std::shared_ptr<Array> out;
+ ReadSingleColumnFile(ReaderFromSink(), &out);
+ ASSERT_TRUE(values->Equals(out));
+}
+
+} // namespace parquet
+
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/e0fb3698/cpp/src/arrow/parquet/parquet-reader-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-reader-test.cc b/cpp/src/arrow/parquet/parquet-reader-test.cc
deleted file mode 100644
index a7fc2a8..0000000
--- a/cpp/src/arrow/parquet/parquet-reader-test.cc
+++ /dev/null
@@ -1,116 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "gtest/gtest.h"
-
-#include "arrow/test-util.h"
-#include "arrow/parquet/reader.h"
-#include "arrow/types/primitive.h"
-#include "arrow/util/memory-pool.h"
-#include "arrow/util/status.h"
-
-#include "parquet/api/reader.h"
-#include "parquet/api/writer.h"
-
-using ParquetBuffer = parquet::Buffer;
-using parquet::BufferReader;
-using parquet::InMemoryOutputStream;
-using parquet::Int64Writer;
-using parquet::ParquetFileReader;
-using parquet::ParquetFileWriter;
-using parquet::RandomAccessSource;
-using parquet::Repetition;
-using parquet::SchemaDescriptor;
-using ParquetType = parquet::Type;
-using parquet::schema::GroupNode;
-using parquet::schema::NodePtr;
-using parquet::schema::PrimitiveNode;
-
-namespace arrow {
-
-namespace parquet {
-
-class TestReadParquet : public ::testing::Test {
- public:
- virtual void SetUp() {}
-
- std::shared_ptr<GroupNode> Int64Schema() {
- auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64);
- NodePtr node_ =
- GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
- return std::static_pointer_cast<GroupNode>(node_);
- }
-
- std::unique_ptr<ParquetFileReader> Int64File(
- std::vector<int64_t>& values, int num_chunks) {
- std::shared_ptr<GroupNode> schema = Int64Schema();
- std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
- auto file_writer = ParquetFileWriter::Open(sink, schema);
- size_t chunk_size = values.size() / num_chunks;
- for (int i = 0; i < num_chunks; i++) {
- auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
- auto column_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
- int64_t* data = values.data() + i * chunk_size;
- column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
- column_writer->Close();
- row_group_writer->Close();
- }
- file_writer->Close();
-
- std::shared_ptr<ParquetBuffer> buffer = sink->GetBuffer();
- std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
- return ParquetFileReader::Open(std::move(source));
- }
-
- private:
-};
-
-TEST_F(TestReadParquet, SingleColumnInt64) {
- std::vector<int64_t> values(100, 128);
- std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 1);
- arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
- std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
- ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader)));
- ASSERT_NE(nullptr, column_reader.get());
- std::shared_ptr<Array> out;
- ASSERT_OK(column_reader->NextBatch(100, &out));
- ASSERT_NE(nullptr, out.get());
- Int64Array* out_array = static_cast<Int64Array*>(out.get());
- for (size_t i = 0; i < values.size(); i++) {
- EXPECT_EQ(values[i], out_array->raw_data()[i]);
- }
-}
-
-TEST_F(TestReadParquet, SingleColumnInt64Chunked) {
- std::vector<int64_t> values(100, 128);
- std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 4);
- arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
- std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
- ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader)));
- ASSERT_NE(nullptr, column_reader.get());
- std::shared_ptr<Array> out;
- ASSERT_OK(column_reader->NextBatch(100, &out));
- ASSERT_NE(nullptr, out.get());
- Int64Array* out_array = static_cast<Int64Array*>(out.get());
- for (size_t i = 0; i < values.size(); i++) {
- EXPECT_EQ(values[i], out_array->raw_data()[i]);
- }
-}
-
-} // namespace parquet
-
-} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/e0fb3698/cpp/src/arrow/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc
index 481ded5..346de25 100644
--- a/cpp/src/arrow/parquet/reader.cc
+++ b/cpp/src/arrow/parquet/reader.cc
@@ -26,6 +26,7 @@
#include "arrow/util/status.h"
using parquet::ColumnReader;
+using parquet::Repetition;
using parquet::TypedColumnReader;
namespace arrow {
@@ -36,6 +37,7 @@ class FileReader::Impl {
Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader);
virtual ~Impl() {}
+ bool CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr);
Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);
@@ -51,7 +53,7 @@ class FlatColumnReader::Impl {
virtual ~Impl() {}
Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
- template <typename ArrowType, typename ParquetType, typename CType>
+ template <typename ArrowType, typename ParquetType>
Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
private:
@@ -67,14 +69,28 @@ class FlatColumnReader::Impl {
PoolBuffer values_buffer_;
PoolBuffer def_levels_buffer_;
- PoolBuffer rep_levels_buffer_;
+ PoolBuffer values_builder_buffer_;
+ PoolBuffer valid_bytes_buffer_;
};
FileReader::Impl::Impl(
MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader)
: pool_(pool), reader_(std::move(reader)) {}
+bool FileReader::Impl::CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr) {
+ if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) {
+ return false;
+ } else if ((descr->max_definition_level() == 1) &&
+ (descr->schema_node()->repetition() != Repetition::OPTIONAL)) {
+ return false;
+ }
+ return true;
+}
+
Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
+ if (!CheckForFlatColumn(reader_->descr()->Column(i))) {
+ return Status::Invalid("The requested column is not flat");
+ }
std::unique_ptr<FlatColumnReader::Impl> impl(
new FlatColumnReader::Impl(pool_, reader_->descr()->Column(i), reader_.get(), i));
*out = std::unique_ptr<FlatColumnReader>(new FlatColumnReader(std::move(impl)));
@@ -109,37 +125,50 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor
column_index_(column_index),
next_row_group_(0),
values_buffer_(pool),
- def_levels_buffer_(pool),
- rep_levels_buffer_(pool) {
+ def_levels_buffer_(pool) {
NodeToField(descr_->schema_node(), &field_);
NextRowGroup();
}
-template <typename ArrowType, typename ParquetType, typename CType>
+template <typename ArrowType, typename ParquetType>
Status FlatColumnReader::Impl::TypedReadBatch(
int batch_size, std::shared_ptr<Array>* out) {
int values_to_read = batch_size;
NumericBuilder<ArrowType> builder(pool_, field_->type);
while ((values_to_read > 0) && column_reader_) {
- values_buffer_.Resize(values_to_read * sizeof(CType));
+ values_buffer_.Resize(values_to_read * sizeof(typename ParquetType::c_type));
if (descr_->max_definition_level() > 0) {
def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
}
- if (descr_->max_repetition_level() > 0) {
- rep_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
- }
auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
int64_t values_read;
- CType* values = reinterpret_cast<CType*>(values_buffer_.mutable_data());
- PARQUET_CATCH_NOT_OK(
- values_to_read -= reader->ReadBatch(values_to_read,
- reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()),
- reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data()), values,
- &values_read));
+ int64_t levels_read;
+ int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ auto values =
+ reinterpret_cast<typename ParquetType::c_type*>(values_buffer_.mutable_data());
+ PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+ values_to_read, def_levels, nullptr, values, &values_read));
+ values_to_read -= levels_read;
if (descr_->max_definition_level() == 0) {
RETURN_NOT_OK(builder.Append(values, values_read));
} else {
- return Status::NotImplemented("no support for definition levels yet");
+ // descr_->max_definition_level() == 1
+ RETURN_NOT_OK(values_builder_buffer_.Resize(
+ levels_read * sizeof(typename ParquetType::c_type)));
+ RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
+ auto values_ptr = reinterpret_cast<typename ParquetType::c_type*>(
+ values_builder_buffer_.mutable_data());
+ uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
+ int values_idx = 0;
+ for (int64_t i = 0; i < levels_read; i++) {
+ if (def_levels[i] < descr_->max_definition_level()) {
+ valid_bytes[i] = 0;
+ } else {
+ valid_bytes[i] = 1;
+ values_ptr[i] = values[values_idx++];
+ }
+ }
+ builder.Append(values_ptr, levels_read, valid_bytes);
}
if (!column_reader_->HasNext()) { NextRowGroup(); }
}
@@ -147,9 +176,9 @@ Status FlatColumnReader::Impl::TypedReadBatch(
return Status::OK();
}
-#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType, CType) \
- case Type::ENUM: \
- return TypedReadBatch<ArrowType, ParquetType, CType>(batch_size, out); \
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
+ case Type::ENUM: \
+ return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
break;
Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
@@ -159,15 +188,11 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>*
return Status::OK();
}
- if (descr_->max_repetition_level() > 0) {
- return Status::NotImplemented("no support for repetition yet");
- }
-
switch (field_->type->type) {
- TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type, int32_t)
- TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type, int64_t)
- TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType, float)
- TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType, double)
+ TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type)
+ TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type)
+ TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType)
+ TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType)
default:
return Status::NotImplemented(field_->type->ToString());
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/e0fb3698/cpp/src/arrow/parquet/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc
new file mode 100644
index 0000000..3ad2c5b
--- /dev/null
+++ b/cpp/src/arrow/parquet/writer.cc
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/parquet/writer.h"
+
+#include "arrow/array.h"
+#include "arrow/types/primitive.h"
+#include "arrow/parquet/utils.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+
+namespace parquet {
+
+class FileWriter::Impl {
+ public:
+ Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer);
+
+ Status NewRowGroup(int64_t chunk_size);
+ template <typename ParquetType>
+ Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data);
+ Status WriteFlatColumnChunk(const PrimitiveArray* data);
+ Status Close();
+
+ virtual ~Impl() {}
+
+ private:
+ MemoryPool* pool_;
+ PoolBuffer data_buffer_;
+ PoolBuffer def_levels_buffer_;
+ std::unique_ptr<::parquet::ParquetFileWriter> writer_;
+ ::parquet::RowGroupWriter* row_group_writer_;
+};
+
+FileWriter::Impl::Impl(
+ MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer)
+ : pool_(pool),
+ data_buffer_(pool),
+ writer_(std::move(writer)),
+ row_group_writer_(nullptr) {}
+
+Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
+ if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
+ PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup(chunk_size));
+ return Status::OK();
+}
+
+template <typename ParquetType>
+Status FileWriter::Impl::TypedWriteBatch(
+ ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data) {
+ auto data_ptr =
+ reinterpret_cast<const typename ParquetType::c_type*>(data->data()->data());
+ auto writer =
+ reinterpret_cast<::parquet::TypedColumnWriter<ParquetType>*>(column_writer);
+ if (writer->descr()->max_definition_level() == 0) {
+ // no nulls, just dump the data
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(data->length(), nullptr, nullptr, data_ptr));
+ } else if (writer->descr()->max_definition_level() == 1) {
+ RETURN_NOT_OK(def_levels_buffer_.Resize(data->length() * sizeof(int16_t)));
+ int16_t* def_levels_ptr =
+ reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ if (data->null_count() == 0) {
+ std::fill(def_levels_ptr, def_levels_ptr + data->length(), 1);
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(data->length(), def_levels_ptr, nullptr, data_ptr));
+ } else {
+ RETURN_NOT_OK(data_buffer_.Resize(
+ (data->length() - data->null_count()) * sizeof(typename ParquetType::c_type)));
+ auto buffer_ptr =
+ reinterpret_cast<typename ParquetType::c_type*>(data_buffer_.mutable_data());
+ int buffer_idx = 0;
+ for (size_t i = 0; i < data->length(); i++) {
+ if (data->IsNull(i)) {
+ def_levels_ptr[i] = 0;
+ } else {
+ def_levels_ptr[i] = 1;
+ buffer_ptr[buffer_idx++] = data_ptr[i];
+ }
+ }
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(data->length(), def_levels_ptr, nullptr, buffer_ptr));
+ }
+ } else {
+ return Status::NotImplemented("no support for max definition level > 1 yet");
+ }
+ PARQUET_CATCH_NOT_OK(writer->Close());
+ return Status::OK();
+}
+
+Status FileWriter::Impl::Close() {
+ if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
+ PARQUET_CATCH_NOT_OK(writer_->Close());
+ return Status::OK();
+}
+
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
+ case Type::ENUM: \
+ return TypedWriteBatch<ParquetType>(writer, data); \
+ break;
+
+Status FileWriter::Impl::WriteFlatColumnChunk(const PrimitiveArray* data) {
+ ::parquet::ColumnWriter* writer;
+ PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn());
+ switch (data->type_enum()) {
+ TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type)
+ TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type)
+ TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType)
+ TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType)
+ default:
+ return Status::NotImplemented(data->type()->ToString());
+ }
+}
+
+FileWriter::FileWriter(
+ MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer)
+ : impl_(new FileWriter::Impl(pool, std::move(writer))) {}
+
+Status FileWriter::NewRowGroup(int64_t chunk_size) {
+ return impl_->NewRowGroup(chunk_size);
+}
+
+Status FileWriter::WriteFlatColumnChunk(const PrimitiveArray* data) {
+ return impl_->WriteFlatColumnChunk(data);
+}
+
+Status FileWriter::Close() {
+ return impl_->Close();
+}
+
+FileWriter::~FileWriter() {}
+
+} // namespace parquet
+
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/e0fb3698/cpp/src/arrow/parquet/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h
new file mode 100644
index 0000000..38f7d0b
--- /dev/null
+++ b/cpp/src/arrow/parquet/writer.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_PARQUET_WRITER_H
+#define ARROW_PARQUET_WRITER_H
+
+#include <memory>
+
+#include "parquet/api/schema.h"
+#include "parquet/api/writer.h"
+
+namespace arrow {
+
+class MemoryPool;
+class PrimitiveArray;
+class RowBatch;
+class Status;
+
+namespace parquet {
+
+/**
+ * Iterative API:
+ * Start a new RowGroup/Chunk with NewRowGroup
+ * Write column-by-column the whole column chunk
+ */
+class FileWriter {
+ public:
+ FileWriter(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer);
+
+ Status NewRowGroup(int64_t chunk_size);
+ Status WriteFlatColumnChunk(const PrimitiveArray* data);
+ Status Close();
+
+ virtual ~FileWriter();
+
+ private:
+ class Impl;
+ std::unique_ptr<Impl> impl_;
+};
+
+} // namespace parquet
+
+} // namespace arrow
+
+#endif // ARROW_PARQUET_WRITER_H