You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/09/18 18:01:49 UTC
[1/2] parquet-cpp git commit: PARQUET-712: Add library to read into
Arrow memory
Repository: parquet-cpp
Updated Branches:
refs/heads/master 8ef68b17a -> 4a7bf1174
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
new file mode 100644
index 0000000..5a38a28
--- /dev/null
+++ b/src/parquet/arrow/schema.cc
@@ -0,0 +1,351 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/schema.h"
+
+#include <string>
+#include <vector>
+
+#include "parquet/api/schema.h"
+#include "parquet/arrow/utils.h"
+
+#include "arrow/types/decimal.h"
+#include "arrow/types/string.h"
+#include "arrow/util/status.h"
+
+using arrow::Field;
+using arrow::Status;
+using arrow::TypePtr;
+
+using ArrowType = arrow::Type;
+
+using parquet::Repetition;
+using parquet::schema::Node;
+using parquet::schema::NodePtr;
+using parquet::schema::GroupNode;
+using parquet::schema::PrimitiveNode;
+
+using ParquetType = parquet::Type;
+using parquet::LogicalType;
+
+namespace parquet {
+
+namespace arrow {
+
+const auto BOOL = std::make_shared<::arrow::BooleanType>();
+const auto UINT8 = std::make_shared<::arrow::UInt8Type>();
+const auto INT8 = std::make_shared<::arrow::Int8Type>();
+const auto UINT16 = std::make_shared<::arrow::UInt16Type>();
+const auto INT16 = std::make_shared<::arrow::Int16Type>();
+const auto UINT32 = std::make_shared<::arrow::UInt32Type>();
+const auto INT32 = std::make_shared<::arrow::Int32Type>();
+const auto UINT64 = std::make_shared<::arrow::UInt64Type>();
+const auto INT64 = std::make_shared<::arrow::Int64Type>();
+const auto FLOAT = std::make_shared<::arrow::FloatType>();
+const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
+const auto UTF8 = std::make_shared<::arrow::StringType>();
+const auto TIMESTAMP_MS =
+ std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
+const auto BINARY =
+ std::make_shared<::arrow::ListType>(std::make_shared<::arrow::Field>("", UINT8));
+
+TypePtr MakeDecimalType(const PrimitiveNode* node) {
+ int precision = node->decimal_metadata().precision;
+ int scale = node->decimal_metadata().scale;
+ return std::make_shared<::arrow::DecimalType>(precision, scale);
+}
+
+static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) {
+ switch (node->logical_type()) {
+ case LogicalType::UTF8:
+ *out = UTF8;
+ break;
+ case LogicalType::DECIMAL:
+ *out = MakeDecimalType(node);
+ break;
+ default:
+ // BINARY
+ *out = BINARY;
+ break;
+ }
+ return Status::OK();
+}
+
+static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
+ switch (node->logical_type()) {
+ case LogicalType::NONE:
+ *out = BINARY;
+ break;
+ case LogicalType::DECIMAL:
+ *out = MakeDecimalType(node);
+ break;
+ default:
+ return Status::NotImplemented("unhandled type");
+ break;
+ }
+
+ return Status::OK();
+}
+
+static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
+ switch (node->logical_type()) {
+ case LogicalType::NONE:
+ *out = INT32;
+ break;
+ case LogicalType::UINT_8:
+ *out = UINT8;
+ break;
+ case LogicalType::INT_8:
+ *out = INT8;
+ break;
+ case LogicalType::UINT_16:
+ *out = UINT16;
+ break;
+ case LogicalType::INT_16:
+ *out = INT16;
+ break;
+ case LogicalType::UINT_32:
+ *out = UINT32;
+ break;
+ case LogicalType::DECIMAL:
+ *out = MakeDecimalType(node);
+ break;
+ default:
+ return Status::NotImplemented("Unhandled logical type for int32");
+ break;
+ }
+ return Status::OK();
+}
+
+static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
+ switch (node->logical_type()) {
+ case LogicalType::NONE:
+ *out = INT64;
+ break;
+ case LogicalType::UINT_64:
+ *out = UINT64;
+ break;
+ case LogicalType::DECIMAL:
+ *out = MakeDecimalType(node);
+ break;
+ case LogicalType::TIMESTAMP_MILLIS:
+ *out = TIMESTAMP_MS;
+ break;
+ default:
+ return Status::NotImplemented("Unhandled logical type for int64");
+ break;
+ }
+ return Status::OK();
+}
+
+// TODO: Logical Type Handling
+Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) {
+ std::shared_ptr<::arrow::DataType> type;
+
+ if (node->is_repeated()) {
+ return Status::NotImplemented("No support yet for repeated node types");
+ }
+
+ if (node->is_group()) {
+ const GroupNode* group = static_cast<const GroupNode*>(node.get());
+ std::vector<std::shared_ptr<Field>> fields(group->field_count());
+ for (int i = 0; i < group->field_count(); i++) {
+ RETURN_NOT_OK(NodeToField(group->field(i), &fields[i]));
+ }
+ type = std::make_shared<::arrow::StructType>(fields);
+ } else {
+ // Primitive (leaf) node
+ const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
+
+ switch (primitive->physical_type()) {
+ case ParquetType::BOOLEAN:
+ type = BOOL;
+ break;
+ case ParquetType::INT32:
+ RETURN_NOT_OK(FromInt32(primitive, &type));
+ break;
+ case ParquetType::INT64:
+ RETURN_NOT_OK(FromInt64(primitive, &type));
+ break;
+ case ParquetType::INT96:
+ // TODO: Do we have that type in Arrow?
+ // type = TypePtr(new Int96Type());
+ return Status::NotImplemented("int96");
+ case ParquetType::FLOAT:
+ type = FLOAT;
+ break;
+ case ParquetType::DOUBLE:
+ type = DOUBLE;
+ break;
+ case ParquetType::BYTE_ARRAY:
+ // TODO: Do we have that type in Arrow?
+ RETURN_NOT_OK(FromByteArray(primitive, &type));
+ break;
+ case ParquetType::FIXED_LEN_BYTE_ARRAY:
+ RETURN_NOT_OK(FromFLBA(primitive, &type));
+ break;
+ }
+ }
+
+ *out = std::make_shared<Field>(node->name(), type, !node->is_required());
+ return Status::OK();
+}
+
+Status FromParquetSchema(
+ const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out) {
+ // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes
+ // from the root Parquet node
+ const GroupNode* schema_node = parquet_schema->group_node();
+
+ std::vector<std::shared_ptr<Field>> fields(schema_node->field_count());
+ for (int i = 0; i < schema_node->field_count(); i++) {
+ RETURN_NOT_OK(NodeToField(schema_node->field(i), &fields[i]));
+ }
+
+ *out = std::make_shared<::arrow::Schema>(fields);
+ return Status::OK();
+}
+
+Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
+ const std::string& name, bool nullable, const WriterProperties& properties,
+ NodePtr* out) {
+ Repetition::type repetition = Repetition::REQUIRED;
+ if (nullable) { repetition = Repetition::OPTIONAL; }
+
+ std::vector<NodePtr> children(type->num_children());
+ for (int i = 0; i < type->num_children(); i++) {
+ RETURN_NOT_OK(FieldToNode(type->child(i), properties, &children[i]));
+ }
+
+ *out = GroupNode::Make(name, repetition, children);
+ return Status::OK();
+}
+
+Status FieldToNode(const std::shared_ptr<Field>& field,
+ const WriterProperties& properties, NodePtr* out) {
+ LogicalType::type logical_type = LogicalType::NONE;
+ ParquetType::type type;
+ Repetition::type repetition = Repetition::REQUIRED;
+ if (field->nullable) { repetition = Repetition::OPTIONAL; }
+ int length = -1;
+
+ switch (field->type->type) {
+ // TODO:
+ // case ArrowType::NA:
+ // break;
+ case ArrowType::BOOL:
+ type = ParquetType::BOOLEAN;
+ break;
+ case ArrowType::UINT8:
+ type = ParquetType::INT32;
+ logical_type = LogicalType::UINT_8;
+ break;
+ case ArrowType::INT8:
+ type = ParquetType::INT32;
+ logical_type = LogicalType::INT_8;
+ break;
+ case ArrowType::UINT16:
+ type = ParquetType::INT32;
+ logical_type = LogicalType::UINT_16;
+ break;
+ case ArrowType::INT16:
+ type = ParquetType::INT32;
+ logical_type = LogicalType::INT_16;
+ break;
+ case ArrowType::UINT32:
+ if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) {
+ type = ParquetType::INT64;
+ } else {
+ type = ParquetType::INT32;
+ logical_type = LogicalType::UINT_32;
+ }
+ break;
+ case ArrowType::INT32:
+ type = ParquetType::INT32;
+ break;
+ case ArrowType::UINT64:
+ type = ParquetType::INT64;
+ logical_type = LogicalType::UINT_64;
+ break;
+ case ArrowType::INT64:
+ type = ParquetType::INT64;
+ break;
+ case ArrowType::FLOAT:
+ type = ParquetType::FLOAT;
+ break;
+ case ArrowType::DOUBLE:
+ type = ParquetType::DOUBLE;
+ break;
+ case ArrowType::STRING:
+ type = ParquetType::BYTE_ARRAY;
+ logical_type = LogicalType::UTF8;
+ break;
+ case ArrowType::BINARY:
+ type = ParquetType::BYTE_ARRAY;
+ break;
+ case ArrowType::DATE:
+ type = ParquetType::INT32;
+ logical_type = LogicalType::DATE;
+ break;
+ case ArrowType::TIMESTAMP: {
+ auto timestamp_type = static_cast<::arrow::TimestampType*>(field->type.get());
+ if (timestamp_type->unit != ::arrow::TimestampType::Unit::MILLI) {
+ return Status::NotImplemented(
+ "Other timestamp units than millisecond are not yet support with parquet.");
+ }
+ type = ParquetType::INT64;
+ logical_type = LogicalType::TIMESTAMP_MILLIS;
+ } break;
+ case ArrowType::TIMESTAMP_DOUBLE:
+ type = ParquetType::INT64;
+ // This is specified as seconds since the UNIX epoch
+ // TODO: Converted type in Parquet?
+ // logical_type = LogicalType::TIMESTAMP_MILLIS;
+ break;
+ case ArrowType::TIME:
+ type = ParquetType::INT64;
+ logical_type = LogicalType::TIME_MILLIS;
+ break;
+ case ArrowType::STRUCT: {
+ auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type);
+ return StructToNode(struct_type, field->name, field->nullable, properties, out);
+ } break;
+ default:
+ // TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
+ return Status::NotImplemented("unhandled type");
+ }
+ *out = PrimitiveNode::Make(field->name, repetition, type, logical_type, length);
+ return Status::OK();
+}
+
+Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
+ const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out) {
+ std::vector<NodePtr> nodes(arrow_schema->num_fields());
+ for (int i = 0; i < arrow_schema->num_fields(); i++) {
+ RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i]));
+ }
+
+ NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
+ *out = std::make_shared<::parquet::SchemaDescriptor>();
+ PARQUET_CATCH_NOT_OK((*out)->Init(schema));
+
+ return Status::OK();
+}
+
+} // namespace arrow
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/schema.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
new file mode 100644
index 0000000..6917b90
--- /dev/null
+++ b/src/parquet/arrow/schema.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_ARROW_SCHEMA_H
+#define PARQUET_ARROW_SCHEMA_H
+
+#include <memory>
+
+#include "arrow/schema.h"
+#include "arrow/type.h"
+#include "arrow/util/visibility.h"
+
+#include "parquet/api/schema.h"
+#include "parquet/api/writer.h"
+
+namespace arrow {
+
+class Status;
+
+} // namespace arrow
+
+namespace parquet {
+
+namespace arrow {
+
+::arrow::Status PARQUET_EXPORT NodeToField(
+ const schema::NodePtr& node, std::shared_ptr<::arrow::Field>* out);
+
+::arrow::Status PARQUET_EXPORT FromParquetSchema(
+ const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out);
+
+::arrow::Status PARQUET_EXPORT FieldToNode(const std::shared_ptr<::arrow::Field>& field,
+ const WriterProperties& properties, schema::NodePtr* out);
+
+::arrow::Status PARQUET_EXPORT ToParquetSchema(const ::arrow::Schema* arrow_schema,
+ const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out);
+
+} // namespace arrow
+
+} // namespace parquet
+
+#endif // PARQUET_ARROW_SCHEMA_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
new file mode 100644
index 0000000..deac9f7
--- /dev/null
+++ b/src/parquet/arrow/test-util.h
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <vector>
+
+#include "arrow/test-util.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+
+namespace parquet {
+
+namespace arrow {
+
+template <typename ArrowType>
+using is_arrow_float = std::is_floating_point<typename ArrowType::c_type>;
+
+template <typename ArrowType>
+using is_arrow_int = std::is_integral<typename ArrowType::c_type>;
+
+template <typename ArrowType>
+using is_arrow_string = std::is_same<ArrowType, ::arrow::StringType>;
+
+template <class ArrowType>
+typename std::enable_if<is_arrow_float<ArrowType>::value,
+ std::shared_ptr<::arrow::PrimitiveArray>>::type
+NonNullArray(size_t size) {
+ std::vector<typename ArrowType::c_type> values;
+ ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values);
+ ::arrow::NumericBuilder<ArrowType> builder(
+ ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+ builder.Append(values.data(), values.size());
+ return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish());
+}
+
+template <class ArrowType>
+typename std::enable_if<is_arrow_int<ArrowType>::value,
+ std::shared_ptr<::arrow::PrimitiveArray>>::type
+NonNullArray(size_t size) {
+ std::vector<typename ArrowType::c_type> values;
+ ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+ ::arrow::NumericBuilder<ArrowType> builder(
+ ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+ builder.Append(values.data(), values.size());
+ return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish());
+}
+
+template <class ArrowType>
+typename std::enable_if<is_arrow_string<ArrowType>::value,
+ std::shared_ptr<::arrow::StringArray>>::type
+NonNullArray(size_t size) {
+ ::arrow::StringBuilder builder(
+ ::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>());
+ for (size_t i = 0; i < size; i++) {
+ builder.Append("test-string");
+ }
+ return std::static_pointer_cast<::arrow::StringArray>(builder.Finish());
+}
+
+template <>
+std::shared_ptr<::arrow::PrimitiveArray> NonNullArray<::arrow::BooleanType>(size_t size) {
+ std::vector<uint8_t> values;
+ ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+ ::arrow::BooleanBuilder builder(
+ ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>());
+ builder.Append(values.data(), values.size());
+ return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls.
+template <typename ArrowType>
+typename std::enable_if<is_arrow_float<ArrowType>::value,
+ std::shared_ptr<::arrow::PrimitiveArray>>::type
+NullableArray(size_t size, size_t num_nulls) {
+ std::vector<typename ArrowType::c_type> values;
+ ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values);
+ std::vector<uint8_t> valid_bytes(size, 1);
+
+ for (size_t i = 0; i < num_nulls; i++) {
+ valid_bytes[i * 2] = 0;
+ }
+
+ ::arrow::NumericBuilder<ArrowType> builder(
+ ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+ builder.Append(values.data(), values.size(), valid_bytes.data());
+ return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls.
+template <typename ArrowType>
+typename std::enable_if<is_arrow_int<ArrowType>::value,
+ std::shared_ptr<::arrow::PrimitiveArray>>::type
+NullableArray(size_t size, size_t num_nulls) {
+ std::vector<typename ArrowType::c_type> values;
+ ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+ std::vector<uint8_t> valid_bytes(size, 1);
+
+ for (size_t i = 0; i < num_nulls; i++) {
+ valid_bytes[i * 2] = 0;
+ }
+
+ ::arrow::NumericBuilder<ArrowType> builder(
+ ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+ builder.Append(values.data(), values.size(), valid_bytes.data());
+ return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls yet.
+template <typename ArrowType>
+typename std::enable_if<is_arrow_string<ArrowType>::value,
+ std::shared_ptr<::arrow::StringArray>>::type
+NullableArray(size_t size, size_t num_nulls) {
+ std::vector<uint8_t> valid_bytes(size, 1);
+
+ for (size_t i = 0; i < num_nulls; i++) {
+ valid_bytes[i * 2] = 0;
+ }
+
+ ::arrow::StringBuilder builder(
+ ::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>());
+ for (size_t i = 0; i < size; i++) {
+ builder.Append("test-string");
+ }
+ return std::static_pointer_cast<::arrow::StringArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls yet.
+template <>
+std::shared_ptr<::arrow::PrimitiveArray> NullableArray<::arrow::BooleanType>(
+ size_t size, size_t num_nulls) {
+ std::vector<uint8_t> values;
+ ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+ std::vector<uint8_t> valid_bytes(size, 1);
+
+ for (size_t i = 0; i < num_nulls; i++) {
+ valid_bytes[i * 2] = 0;
+ }
+
+ ::arrow::BooleanBuilder builder(
+ ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>());
+ builder.Append(values.data(), values.size(), valid_bytes.data());
+ return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish());
+}
+
+std::shared_ptr<::arrow::Column> MakeColumn(const std::string& name,
+ const std::shared_ptr<::arrow::Array>& array, bool nullable) {
+ auto field = std::make_shared<::arrow::Field>(name, array->type(), nullable);
+ return std::make_shared<::arrow::Column>(field, array);
+}
+
+std::shared_ptr<::arrow::Table> MakeSimpleTable(
+ const std::shared_ptr<::arrow::Array>& values, bool nullable) {
+ std::shared_ptr<::arrow::Column> column = MakeColumn("col", values, nullable);
+ std::vector<std::shared_ptr<::arrow::Column>> columns({column});
+ std::vector<std::shared_ptr<::arrow::Field>> fields({column->field()});
+ auto schema = std::make_shared<::arrow::Schema>(fields);
+ return std::make_shared<::arrow::Table>("table", schema, columns);
+}
+
+template <typename T>
+void ExpectArray(T* expected, ::arrow::Array* result) {
+ auto p_array = static_cast<::arrow::PrimitiveArray*>(result);
+ for (int i = 0; i < result->length(); i++) {
+ EXPECT_EQ(expected[i], reinterpret_cast<const T*>(p_array->data()->data())[i]);
+ }
+}
+
+template <typename ArrowType>
+void ExpectArray(typename ArrowType::c_type* expected, ::arrow::Array* result) {
+ ::arrow::PrimitiveArray* p_array = static_cast<::arrow::PrimitiveArray*>(result);
+ for (int64_t i = 0; i < result->length(); i++) {
+ EXPECT_EQ(expected[i],
+ reinterpret_cast<const typename ArrowType::c_type*>(p_array->data()->data())[i]);
+ }
+}
+
+template <>
+void ExpectArray<::arrow::BooleanType>(uint8_t* expected, ::arrow::Array* result) {
+ ::arrow::BooleanBuilder builder(
+ ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>());
+ builder.Append(expected, result->length());
+ std::shared_ptr<::arrow::Array> expected_array = builder.Finish();
+ EXPECT_TRUE(result->Equals(expected_array));
+}
+
+} // namespace arrow
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/utils.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/utils.h b/src/parquet/arrow/utils.h
new file mode 100644
index 0000000..b443c99
--- /dev/null
+++ b/src/parquet/arrow/utils.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_ARROW_UTILS_H
+#define PARQUET_ARROW_UTILS_H
+
+#include <sstream>
+
+#include "arrow/util/status.h"
+#include "parquet/exception.h"
+
+namespace parquet {
+namespace arrow {
+
+#define PARQUET_CATCH_NOT_OK(s) \
+ try { \
+ (s); \
+ } catch (const ::parquet::ParquetException& e) { \
+ return ::arrow::Status::Invalid(e.what()); \
+ }
+
+#define PARQUET_IGNORE_NOT_OK(s) \
+ try { \
+ (s); \
+ } catch (const ::parquet::ParquetException& e) {}
+
+#define PARQUET_THROW_NOT_OK(s) \
+ do { \
+ ::arrow::Status _s = (s); \
+ if (!_s.ok()) { \
+ std::stringstream ss; \
+ ss << "Arrow error: " << _s.ToString(); \
+ throw ::parquet::ParquetException(ss.str()); \
+ } \
+ } while (0);
+
+} // namespace arrow
+} // namespace parquet
+
+#endif // PARQUET_ARROW_UTILS_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
new file mode 100644
index 0000000..5b5f41f
--- /dev/null
+++ b/src/parquet/arrow/writer.cc
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/writer.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "parquet/arrow/schema.h"
+#include "parquet/arrow/utils.h"
+
+#include "arrow/array.h"
+#include "arrow/column.h"
+#include "arrow/table.h"
+#include "arrow/types/construct.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/util/status.h"
+
+using arrow::MemoryPool;
+using arrow::PoolBuffer;
+using arrow::PrimitiveArray;
+using arrow::Status;
+using arrow::StringArray;
+using arrow::Table;
+
+using parquet::ParquetFileWriter;
+using parquet::ParquetVersion;
+using parquet::schema::GroupNode;
+
+namespace parquet {
+namespace arrow {
+
+class FileWriter::Impl {
+ public:
+ Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
+
+ Status NewRowGroup(int64_t chunk_size);
+ template <typename ParquetType, typename ArrowType>
+ Status TypedWriteBatch(ColumnWriter* writer, const PrimitiveArray* data,
+ int64_t offset, int64_t length);
+
+ // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary
+ // buffer
+ template <typename InType, typename OutType>
+ struct can_copy_ptr {
+ static constexpr bool value =
+ std::is_same<InType, OutType>::value ||
+ (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
+ (sizeof(InType) == sizeof(OutType)));
+ };
+
+ template <typename InType, typename OutType,
+ typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+ Status ConvertPhysicalType(const InType* in_ptr, int64_t, const OutType** out_ptr) {
+ *out_ptr = reinterpret_cast<const OutType*>(in_ptr);
+ return Status::OK();
+ }
+
+ template <typename InType, typename OutType,
+ typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+ Status ConvertPhysicalType(
+ const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+ RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(OutType)));
+ OutType* mutable_out_ptr = reinterpret_cast<OutType*>(data_buffer_.mutable_data());
+ std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
+ *out_ptr = mutable_out_ptr;
+ return Status::OK();
+ }
+
+ Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length);
+ Status WriteFlatColumnChunk(const StringArray* data, int64_t offset, int64_t length);
+ Status Close();
+
+ virtual ~Impl() {}
+
+ private:
+ friend class FileWriter;
+
+ MemoryPool* pool_;
+ // Buffer used for storing the data of an array converted to the physical type
+ // as expected by parquet-cpp.
+ PoolBuffer data_buffer_;
+ PoolBuffer def_levels_buffer_;
+ std::unique_ptr<ParquetFileWriter> writer_;
+ RowGroupWriter* row_group_writer_;
+};
+
+FileWriter::Impl::Impl(
+ MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer)
+ : pool_(pool),
+ data_buffer_(pool),
+ writer_(std::move(writer)),
+ row_group_writer_(nullptr) {}
+
+Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
+ if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
+ PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup(chunk_size));
+ return Status::OK();
+}
+
+template <typename ParquetType, typename ArrowType>
+Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
+ const PrimitiveArray* data, int64_t offset, int64_t length) {
+ using ArrowCType = typename ArrowType::c_type;
+ using ParquetCType = typename ParquetType::c_type;
+
+ DCHECK((offset + length) <= data->length());
+ auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data()) + offset;
+ auto writer =
+ reinterpret_cast<TypedColumnWriter<ParquetType>*>(column_writer);
+ if (writer->descr()->max_definition_level() == 0) {
+ // no nulls, just dump the data
+ const ParquetCType* data_writer_ptr = nullptr;
+ RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
+ data_ptr, length, &data_writer_ptr)));
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_writer_ptr));
+ } else if (writer->descr()->max_definition_level() == 1) {
+ RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
+ int16_t* def_levels_ptr =
+ reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ if (data->null_count() == 0) {
+ std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+ const ParquetCType* data_writer_ptr = nullptr;
+ RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
+ data_ptr, length, &data_writer_ptr)));
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(length, def_levels_ptr, nullptr, data_writer_ptr));
+ } else {
+ RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType)));
+ auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
+ int buffer_idx = 0;
+ for (int i = 0; i < length; i++) {
+ if (data->IsNull(offset + i)) {
+ def_levels_ptr[i] = 0;
+ } else {
+ def_levels_ptr[i] = 1;
+ buffer_ptr[buffer_idx++] = static_cast<ParquetCType>(data_ptr[i]);
+ }
+ }
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+ }
+ } else {
+ return Status::NotImplemented("no support for max definition level > 1 yet");
+ }
+ PARQUET_CATCH_NOT_OK(writer->Close());
+ return Status::OK();
+}
+
+// This specialization seems quite similar but it significantly differs in two points:
+// * offset is added at the most latest time to the pointer as we have sub-byte access
+// * Arrow data is stored bitwise thus we cannot use std::copy to transform from
+// ArrowType::c_type to ParquetType::c_type
+template <>
+Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
+ ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset,
+ int64_t length) {
+ DCHECK((offset + length) <= data->length());
+ RETURN_NOT_OK(data_buffer_.Resize(length));
+ auto data_ptr = reinterpret_cast<const uint8_t*>(data->data()->data());
+ auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data());
+ auto writer = reinterpret_cast<TypedColumnWriter<BooleanType>*>(
+ column_writer);
+ if (writer->descr()->max_definition_level() == 0) {
+ // no nulls, just dump the data
+ for (int64_t i = 0; i < length; i++) {
+ buffer_ptr[i] = ::arrow::util::get_bit(data_ptr, offset + i);
+ }
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, buffer_ptr));
+ } else if (writer->descr()->max_definition_level() == 1) {
+ RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
+ int16_t* def_levels_ptr =
+ reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ if (data->null_count() == 0) {
+ std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+ for (int64_t i = 0; i < length; i++) {
+ buffer_ptr[i] = ::arrow::util::get_bit(data_ptr, offset + i);
+ }
+ // TODO(PARQUET-644): write boolean values as a packed bitmap
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+ } else {
+ int buffer_idx = 0;
+ for (int i = 0; i < length; i++) {
+ if (data->IsNull(offset + i)) {
+ def_levels_ptr[i] = 0;
+ } else {
+ def_levels_ptr[i] = 1;
+ buffer_ptr[buffer_idx++] = ::arrow::util::get_bit(data_ptr, offset + i);
+ }
+ }
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+ }
+ } else {
+ return Status::NotImplemented("no support for max definition level > 1 yet");
+ }
+ PARQUET_CATCH_NOT_OK(writer->Close());
+ return Status::OK();
+}
+
+Status FileWriter::Impl::Close() {
+ if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
+ PARQUET_CATCH_NOT_OK(writer_->Close());
+ return Status::OK();
+}
+
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
+ case ::arrow::Type::ENUM: \
+ return TypedWriteBatch<ParquetType, ArrowType>(writer, data, offset, length); \
+ break;
+
+Status FileWriter::Impl::WriteFlatColumnChunk(
+ const PrimitiveArray* data, int64_t offset, int64_t length) {
+ ColumnWriter* writer;
+ PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn());
+ switch (data->type_enum()) {
+ TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
+ TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
+ TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
+ TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
+ TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
+ case ::arrow::Type::UINT32:
+ if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
+ // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
+ // to use the larger Int64Type to store them lossless.
+ return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(
+ writer, data, offset, length);
+ } else {
+ return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(
+ writer, data, offset, length);
+ }
+ TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
+ TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
+ TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
+ TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type)
+ TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
+ TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
+ default:
+ return Status::NotImplemented(data->type()->ToString());
+ }
+}
+
+Status FileWriter::Impl::WriteFlatColumnChunk(
+ const StringArray* data, int64_t offset, int64_t length) {
+ ColumnWriter* column_writer;
+ PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
+ DCHECK((offset + length) <= data->length());
+ RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ByteArray)));
+ auto buffer_ptr = reinterpret_cast<ByteArray*>(data_buffer_.mutable_data());
+ auto values = std::dynamic_pointer_cast<PrimitiveArray>(data->values());
+ auto data_ptr = reinterpret_cast<const uint8_t*>(values->data()->data());
+ DCHECK(values != nullptr);
+ auto writer = reinterpret_cast<TypedColumnWriter<ByteArrayType>*>(
+ column_writer);
+ if (writer->descr()->max_definition_level() > 0) {
+ RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
+ }
+ int16_t* def_levels_ptr = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ if (writer->descr()->max_definition_level() == 0 || data->null_count() == 0) {
+ // no nulls, just dump the data
+ for (int64_t i = 0; i < length; i++) {
+ buffer_ptr[i] = ByteArray(
+ data->value_length(i + offset), data_ptr + data->value_offset(i));
+ }
+ if (writer->descr()->max_definition_level() > 0) {
+ std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+ }
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+ } else if (writer->descr()->max_definition_level() == 1) {
+ int buffer_idx = 0;
+ for (int64_t i = 0; i < length; i++) {
+ if (data->IsNull(offset + i)) {
+ def_levels_ptr[i] = 0;
+ } else {
+ def_levels_ptr[i] = 1;
+ buffer_ptr[buffer_idx++] = ByteArray(
+ data->value_length(i + offset), data_ptr + data->value_offset(i + offset));
+ }
+ }
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+ } else {
+ return Status::NotImplemented("no support for max definition level > 1 yet");
+ }
+ PARQUET_CATCH_NOT_OK(writer->Close());
+ return Status::OK();
+}
+
+FileWriter::FileWriter(
+ MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer)
+ : impl_(new FileWriter::Impl(pool, std::move(writer))) {}
+
+Status FileWriter::NewRowGroup(int64_t chunk_size) {
+ return impl_->NewRowGroup(chunk_size);
+}
+
+Status FileWriter::WriteFlatColumnChunk(
+ const ::arrow::Array* array, int64_t offset, int64_t length) {
+ int64_t real_length = length;
+ if (length == -1) { real_length = array->length(); }
+ if (array->type_enum() == ::arrow::Type::STRING) {
+ auto string_array = dynamic_cast<const ::arrow::StringArray*>(array);
+ DCHECK(string_array);
+ return impl_->WriteFlatColumnChunk(string_array, offset, real_length);
+ } else {
+ auto primitive_array = dynamic_cast<const PrimitiveArray*>(array);
+ if (!primitive_array) {
+ return Status::NotImplemented("Table must consist of PrimitiveArray instances");
+ }
+ return impl_->WriteFlatColumnChunk(primitive_array, offset, real_length);
+ }
+}
+
+Status FileWriter::Close() {
+ return impl_->Close();
+}
+
+MemoryPool* FileWriter::memory_pool() const {
+ return impl_->pool_;
+}
+
+FileWriter::~FileWriter() {}
+
+Status WriteFlatTable(const Table* table, MemoryPool* pool,
+ const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
+ const std::shared_ptr<WriterProperties>& properties) {
+ std::shared_ptr<SchemaDescriptor> parquet_schema;
+ RETURN_NOT_OK(
+ ToParquetSchema(table->schema().get(), *properties.get(), &parquet_schema));
+ auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+ std::unique_ptr<ParquetFileWriter> parquet_writer =
+ ParquetFileWriter::Open(sink, schema_node, properties);
+ FileWriter writer(pool, std::move(parquet_writer));
+
+ // TODO(ARROW-232) Support writing chunked arrays.
+ for (int i = 0; i < table->num_columns(); i++) {
+ if (table->column(i)->data()->num_chunks() != 1) {
+ return Status::NotImplemented("No support for writing chunked arrays yet.");
+ }
+ }
+
+ for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) {
+ int64_t offset = chunk * chunk_size;
+ int64_t size = std::min(chunk_size, table->num_rows() - offset);
+ RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close()));
+ for (int i = 0; i < table->num_columns(); i++) {
+ std::shared_ptr<::arrow::Array> array = table->column(i)->data()->chunk(0);
+ RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(array.get(), offset, size),
+ PARQUET_IGNORE_NOT_OK(writer.Close()));
+ }
+ }
+
+ return writer.Close();
+}
+
+} // namespace arrow
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
new file mode 100644
index 0000000..92524d8
--- /dev/null
+++ b/src/parquet/arrow/writer.h
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_ARROW_WRITER_H
+#define PARQUET_ARROW_WRITER_H
+
+#include <memory>
+
+#include "parquet/api/schema.h"
+#include "parquet/api/writer.h"
+
+namespace arrow {
+
+class Array;
+class MemoryPool;
+class PrimitiveArray;
+class RowBatch;
+class Status;
+class StringArray;
+class Table;
+}
+
+namespace parquet {
+
+namespace arrow {
+
+/**
+ * Iterative API:
+ * Start a new RowGroup/Chunk with NewRowGroup
+ * Write column-by-column the whole column chunk
+ */
+class PARQUET_EXPORT FileWriter {
+ public:
+ FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
+
+ ::arrow::Status NewRowGroup(int64_t chunk_size);
+ ::arrow::Status WriteFlatColumnChunk(
+ const ::arrow::Array* data, int64_t offset = 0, int64_t length = -1);
+ ::arrow::Status Close();
+
+ virtual ~FileWriter();
+
+ ::arrow::MemoryPool* memory_pool() const;
+
+ private:
+ class PARQUET_NO_EXPORT Impl;
+ std::unique_ptr<Impl> impl_;
+};
+
+/**
+ * Write a flat Table to Parquet.
+ *
+ * The table shall only consist of nullable, non-repeated columns of primitive type.
+ */
+::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table,
+ ::arrow::MemoryPool* pool, const std::shared_ptr<OutputStream>& sink,
+ int64_t chunk_size,
+ const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
+
+} // namespace arrow
+
+} // namespace parquet
+
+#endif // PARQUET_ARROW_WRITER_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/thirdparty/build_thirdparty.sh
----------------------------------------------------------------------
diff --git a/thirdparty/build_thirdparty.sh b/thirdparty/build_thirdparty.sh
index dca586c..8229484 100755
--- a/thirdparty/build_thirdparty.sh
+++ b/thirdparty/build_thirdparty.sh
@@ -15,6 +15,7 @@ else
# Allow passing specific libs to build on the command line
for arg in "$*"; do
case $arg in
+ "arrow") F_ARROW=1 ;;
"zlib") F_ZLIB=1 ;;
"gbenchmark") F_GBENCHMARK=1 ;;
"gtest") F_GTEST=1 ;;
@@ -57,6 +58,15 @@ fi
STANDARD_DARWIN_FLAGS="-std=c++11 -stdlib=libc++"
+# build arrow
+if [ -n "$F_ALL" -o -n "$F_ARROW" ]; then
+ cd $TP_DIR/$ARROW_BASEDIR/cpp
+ source ./setup_build_env.sh
+ cmake . -DARROW_PARQUET=OFF -DARROW_HDFS=ON -DCMAKE_INSTALL_PREFIX=$PREFIX
+ make -j$PARALLEL install
+ # :
+fi
+
# build googletest
GOOGLETEST_ERROR="failed for googletest!"
if [ -n "$F_ALL" -o -n "$F_GTEST" ]; then
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/thirdparty/download_thirdparty.sh
----------------------------------------------------------------------
diff --git a/thirdparty/download_thirdparty.sh b/thirdparty/download_thirdparty.sh
index fae6c9c..23bdb96 100755
--- a/thirdparty/download_thirdparty.sh
+++ b/thirdparty/download_thirdparty.sh
@@ -20,6 +20,11 @@ download_extract_and_cleanup() {
rm $filename
}
+if [ ! -d ${ARROW_BASEDIR} ]; then
+ echo "Fetching arrow"
+ download_extract_and_cleanup $ARROW_URL
+fi
+
if [ ! -d ${SNAPPY_BASEDIR} ]; then
echo "Fetching snappy"
download_extract_and_cleanup $SNAPPY_URL
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/thirdparty/set_thirdparty_env.sh
----------------------------------------------------------------------
diff --git a/thirdparty/set_thirdparty_env.sh b/thirdparty/set_thirdparty_env.sh
index 80715ef..3733d08 100644
--- a/thirdparty/set_thirdparty_env.sh
+++ b/thirdparty/set_thirdparty_env.sh
@@ -7,6 +7,7 @@ if [ -z "$THIRDPARTY_DIR" ]; then
THIRDPARTY_DIR=$SOURCE_DIR
fi
+export ARROW_HOME=$THIRDPARTY_DIR/installed
export SNAPPY_HOME=$THIRDPARTY_DIR/installed
export ZLIB_HOME=$THIRDPARTY_DIR/installed
# build script doesn't support building thrift on OSX
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/thirdparty/versions.sh
----------------------------------------------------------------------
diff --git a/thirdparty/versions.sh b/thirdparty/versions.sh
index b56262a..05f5cc2 100755
--- a/thirdparty/versions.sh
+++ b/thirdparty/versions.sh
@@ -1,3 +1,7 @@
+ARROW_VERSION="6b8abb4402ff1f39fc5944a7df6e3b4755691d87"
+ARROW_URL="https://github.com/apache/arrow/archive/${ARROW_VERSION}.tar.gz"
+ARROW_BASEDIR="arrow-${ARROW_VERSION}"
+
SNAPPY_VERSION=1.1.3
SNAPPY_URL="https://github.com/google/snappy/releases/download/${SNAPPY_VERSION}/snappy-${SNAPPY_VERSION}.tar.gz"
SNAPPY_BASEDIR=snappy-$SNAPPY_VERSION
[2/2] parquet-cpp git commit: PARQUET-712: Add library to read into
Arrow memory
Posted by we...@apache.org.
PARQUET-712: Add library to read into Arrow memory
At the moment this is just move of the existing code into the state where it compiles. Outstanding work includes:
- [x] Understand the issues with ParquetException typeid matching in the tests *on macOS*. @wesm We already had this problem and you fixed it somewhere. Do you remember the solution?
- [x] Understand why BoolenType tests break with a Thrift exception
- [ ] Add functions that read directly into Arrow memory and not intermediate structures.
Author: Uwe L. Korn <uw...@xhochy.com>
Author: Korn, Uwe <Uw...@blue-yonder.com>
Closes #158 from xhochy/PARQUET-712 and squashes the following commits:
e55ab1f [Uwe L. Korn] verbose ctest output
62f0f88 [Uwe L. Korn] Add static linkage
fc2c316 [Uwe L. Korn] Style fixes
3f3e24b [Uwe L. Korn] Fix templating problem
45de044 [Uwe L. Korn] Style fixes for IO
1d39a60 [Uwe L. Korn] Import MemoryPool instead of declaring it
251262a [Uwe L. Korn] Style fixes
e0e1518 [Uwe L. Korn] Build parquet_arrow in Travis
874b33d [Korn, Uwe] Add boost libraries for Arrow
142a364 [Korn, Uwe] PARQUET-712: Add library to read into Arrow memory
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/4a7bf117
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/4a7bf117
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/4a7bf117
Branch: refs/heads/master
Commit: 4a7bf1174419db6b2e4fc1847d866c4763b7be44
Parents: 8ef68b1
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Sun Sep 18 14:01:41 2016 -0400
Committer: Wes McKinney <we...@apache.org>
Committed: Sun Sep 18 14:01:41 2016 -0400
----------------------------------------------------------------------
.travis.yml | 6 +-
CMakeLists.txt | 35 ++
ci/travis_script_cpp.sh | 4 +-
cmake_modules/FindArrow.cmake | 87 ++++
conda.recipe/build.sh | 1 +
src/parquet/arrow/CMakeLists.txt | 90 ++++
src/parquet/arrow/arrow-io-test.cc | 189 ++++++++
src/parquet/arrow/arrow-reader-writer-test.cc | 502 +++++++++++++++++++++
src/parquet/arrow/arrow-schema-test.cc | 265 +++++++++++
src/parquet/arrow/io.cc | 107 +++++
src/parquet/arrow/io.h | 82 ++++
src/parquet/arrow/reader.cc | 406 +++++++++++++++++
src/parquet/arrow/reader.h | 148 ++++++
src/parquet/arrow/schema.cc | 351 ++++++++++++++
src/parquet/arrow/schema.h | 56 +++
src/parquet/arrow/test-util.h | 202 +++++++++
src/parquet/arrow/utils.h | 54 +++
src/parquet/arrow/writer.cc | 373 +++++++++++++++
src/parquet/arrow/writer.h | 78 ++++
thirdparty/build_thirdparty.sh | 10 +
thirdparty/download_thirdparty.sh | 5 +
thirdparty/set_thirdparty_env.sh | 1 +
thirdparty/versions.sh | 4 +
23 files changed, 3052 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 780d9f9..6dc994e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -9,6 +9,8 @@ addons:
- g++-4.9
- valgrind
- libboost-dev
+ - libboost-filesystem-dev
+ - libboost-system-dev
- libboost-program-options-dev
- libboost-test-dev
- libssl-dev
@@ -26,7 +28,7 @@ matrix:
before_script:
- source $TRAVIS_BUILD_DIR/ci/before_script_travis.sh
- cmake -DCMAKE_CXX_FLAGS="-Werror" -DPARQUET_TEST_MEMCHECK=ON -DPARQUET_BUILD_BENCHMARKS=ON
- -DPARQUET_GENERATE_COVERAGE=1 $TRAVIS_BUILD_DIR
+ -DPARQUET_ARROW=ON -DPARQUET_GENERATE_COVERAGE=1 $TRAVIS_BUILD_DIR
- export PARQUET_TEST_DATA=$TRAVIS_BUILD_DIR/data
- compiler: clang
os: linux
@@ -76,7 +78,7 @@ before_install:
before_script:
- source $TRAVIS_BUILD_DIR/ci/before_script_travis.sh
-- cmake -DCMAKE_CXX_FLAGS="-Werror" $TRAVIS_BUILD_DIR
+- cmake -DCMAKE_CXX_FLAGS="-Werror" -DPARQUET_ARROW=ON $TRAVIS_BUILD_DIR
- export PARQUET_TEST_DATA=$TRAVIS_BUILD_DIR/data
script:
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3878056..42b10ee 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -82,6 +82,9 @@ if ("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
option(PARQUET_BUILD_EXECUTABLES
"Build the libparquet executable CLI tools"
ON)
+ option(PARQUET_ARROW
+ "Build the Arrow support"
+ OFF)
endif()
# If build in-source, create the latest symlink. If build out-of-source, which is
@@ -247,6 +250,16 @@ function(ADD_PARQUET_TEST_DEPENDENCIES REL_TEST_NAME)
add_dependencies(${TEST_NAME} ${ARGN})
endfunction()
+# A wrapper for add_dependencies() that is compatible with PARQUET_BUILD_TESTS.
+function(ADD_PARQUET_LINK_LIBRARIES REL_TEST_NAME)
+ if(NOT PARQUET_BUILD_TESTS)
+ return()
+ endif()
+ get_filename_component(TEST_NAME ${REL_TEST_NAME} NAME_WE)
+
+ target_link_libraries(${TEST_NAME} ${ARGN})
+endfunction()
+
enable_testing()
############################################################
@@ -553,6 +566,12 @@ if (PARQUET_BUILD_SHARED)
target_link_libraries(parquet_shared
LINK_PUBLIC ${LIBPARQUET_LINK_LIBS}
LINK_PRIVATE ${LIBPARQUET_PRIVATE_LINK_LIBS})
+ if (APPLE)
+ set_target_properties(parquet_shared
+ PROPERTIES
+ BUILD_WITH_INSTALL_RPATH ON
+ INSTALL_NAME_DIR "@rpath")
+ endif()
endif()
if (PARQUET_BUILD_STATIC)
@@ -583,6 +602,22 @@ add_dependencies(parquet_objlib parquet_thrift)
add_subdirectory(benchmarks)
add_subdirectory(tools)
+# Arrow
+if (PARQUET_ARROW)
+ find_package(Arrow REQUIRED)
+ include_directories(SYSTEM ${ARROW_INCLUDE_DIR})
+ add_library(arrow SHARED IMPORTED)
+ set_target_properties(arrow PROPERTIES IMPORTED_LOCATION ${ARROW_SHARED_LIB})
+ add_library(arrow_io SHARED IMPORTED)
+ set_target_properties(arrow_io PROPERTIES IMPORTED_LOCATION ${ARROW_IO_SHARED_LIB})
+ add_library(arrow_static STATIC IMPORTED)
+ set_target_properties(arrow_static PROPERTIES IMPORTED_LOCATION ${ARROW_STATIC_LIB})
+ add_library(arrow_io_static STATIC IMPORTED)
+ set_target_properties(arrow_io_static PROPERTIES IMPORTED_LOCATION ${ARROW_IO_STATIC_LIB})
+
+ add_subdirectory(src/parquet/arrow)
+endif()
+
add_custom_target(clean-all
COMMAND ${CMAKE_BUILD_TOOL} clean
COMMAND ${CMAKE_COMMAND} -P cmake_modules/clean-all.cmake
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/ci/travis_script_cpp.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_cpp.sh b/ci/travis_script_cpp.sh
index 2e7dcdd..8794559 100755
--- a/ci/travis_script_cpp.sh
+++ b/ci/travis_script_cpp.sh
@@ -16,13 +16,13 @@ make lint
if [ $TRAVIS_OS_NAME == "linux" ]; then
make -j4 || exit 1
- ctest -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
+ ctest -VV -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
sudo pip install cpp_coveralls
export PARQUET_ROOT=$TRAVIS_BUILD_DIR
$TRAVIS_BUILD_DIR/ci/upload_coverage.sh
else
make -j4 || exit 1
- ctest -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
+ ctest -VV -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
fi
popd
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/cmake_modules/FindArrow.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindArrow.cmake b/cmake_modules/FindArrow.cmake
new file mode 100644
index 0000000..91d0e71
--- /dev/null
+++ b/cmake_modules/FindArrow.cmake
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# - Find ARROW (arrow/api.h, libarrow.a, libarrow.so)
+# This module defines
+# ARROW_INCLUDE_DIR, directory containing headers
+# ARROW_LIBS, directory containing arrow libraries
+# ARROW_STATIC_LIB, path to libarrow.a
+# ARROW_SHARED_LIB, path to libarrow's shared library
+# ARROW_FOUND, whether arrow has been found
+
+set(ARROW_SEARCH_HEADER_PATHS
+ $ENV{ARROW_HOME}/include
+)
+
+set(ARROW_SEARCH_LIB_PATH
+ $ENV{ARROW_HOME}/lib
+)
+
+find_path(ARROW_INCLUDE_DIR arrow/array.h PATHS
+ ${ARROW_SEARCH_HEADER_PATHS}
+ # make sure we don't accidentally pick up a different version
+ NO_DEFAULT_PATH
+)
+
+find_library(ARROW_LIB_PATH NAMES arrow
+ PATHS
+ ${ARROW_SEARCH_LIB_PATH}
+ NO_DEFAULT_PATH)
+
+find_library(ARROW_IO_LIB_PATH NAMES arrow_io
+ PATHS
+ ${ARROW_SEARCH_LIB_PATH}
+ NO_DEFAULT_PATH)
+
+if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH)
+ set(ARROW_FOUND TRUE)
+ set(ARROW_LIB_NAME libarrow)
+ set(ARROW_IO_LIB_NAME libarrow_io)
+
+ set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH})
+ set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a)
+ set(ARROW_SHARED_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+
+ set(ARROW_IO_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IO_LIB_NAME}.a)
+ set(ARROW_IO_SHARED_LIB ${ARROW_LIBS}/${ARROW_IO_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+ if (NOT Arrow_FIND_QUIETLY)
+ message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}")
+ message(STATUS "Found the Arrow IO library: ${ARROW_IO_LIB_PATH}")
+ endif ()
+else ()
+ if (NOT Arrow_FIND_QUIETLY)
+ set(ARROW_ERR_MSG "Could not find the Arrow library. Looked for headers")
+ set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_HEADER_PATHS}, and for libs")
+ set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_LIB_PATH}")
+ if (Arrow_FIND_REQUIRED)
+ message(FATAL_ERROR "${ARROW_ERR_MSG}")
+ else (Arrow_FIND_REQUIRED)
+ message(STATUS "${ARROW_ERR_MSG}")
+ endif (Arrow_FIND_REQUIRED)
+ endif ()
+ set(ARROW_FOUND FALSE)
+endif ()
+
+mark_as_advanced(
+ ARROW_FOUND
+ ARROW_INCLUDE_DIR
+ ARROW_LIBS
+ ARROW_STATIC_LIB
+ ARROW_SHARED_LIB
+ ARROW_IO_STATIC_LIB
+ ARROW_IO_SHARED_LIB
+)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/conda.recipe/build.sh
----------------------------------------------------------------------
diff --git a/conda.recipe/build.sh b/conda.recipe/build.sh
index 7fe8f91..f77475c 100644
--- a/conda.recipe/build.sh
+++ b/conda.recipe/build.sh
@@ -53,6 +53,7 @@ cmake \
-DCMAKE_BUILD_TYPE=debug \
-DCMAKE_INSTALL_PREFIX=$PREFIX \
-DPARQUET_BUILD_BENCHMARKS=off \
+ -DPARQUET_ARROW=ON \
..
make
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/CMakeLists.txt b/src/parquet/arrow/CMakeLists.txt
new file mode 100644
index 0000000..5d923a7
--- /dev/null
+++ b/src/parquet/arrow/CMakeLists.txt
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# ----------------------------------------------------------------------
+# parquet_arrow : Arrow <-> Parquet adapter
+
+set(PARQUET_ARROW_SRCS
+ io.cc
+ reader.cc
+ schema.cc
+ writer.cc
+)
+
+add_library(parquet_arrow_objlib OBJECT
+ ${PARQUET_ARROW_SRCS}
+)
+
+# SET_TARGET_PROPERTIES(parquet_arrow PROPERTIES LINKER_LANGUAGE CXX)
+
+if (PARQUET_BUILD_SHARED)
+ add_library(parquet_arrow_shared SHARED $<TARGET_OBJECTS:parquet_arrow_objlib>)
+ set_target_properties(parquet_arrow_shared
+ PROPERTIES
+ LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}"
+ LINK_FLAGS "${SHARED_LINK_FLAGS}"
+ OUTPUT_NAME "parquet_arrow")
+ target_link_libraries(parquet_arrow_shared
+ arrow
+ arrow_io
+ parquet_shared)
+ if (APPLE)
+ set_target_properties(parquet_arrow_shared
+ PROPERTIES
+ BUILD_WITH_INSTALL_RPATH ON
+ INSTALL_NAME_DIR "@rpath")
+ endif()
+endif()
+
+if (PARQUET_BUILD_STATIC)
+ add_library(parquet_arrow_static STATIC $<TARGET_OBJECTS:parquet_arrow_objlib>)
+ set_target_properties(parquet_arrow_static
+ PROPERTIES
+ LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}"
+ OUTPUT_NAME "parquet_arrow")
+ target_link_libraries(parquet_arrow_static
+ arrow_static
+ arrow_static
+ parquet_static)
+ install(TARGETS parquet_arrow_static
+ ARCHIVE DESTINATION lib
+ LIBRARY DESTINATION lib)
+endif()
+
+ADD_PARQUET_TEST(arrow-schema-test)
+ADD_PARQUET_TEST(arrow-io-test)
+ADD_PARQUET_TEST(arrow-reader-writer-test)
+
+if (PARQUET_BUILD_STATIC)
+ ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_static)
+ ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_static)
+ ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_static)
+else()
+ ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_shared)
+ ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_shared)
+ ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_shared)
+endif()
+
+# Headers: top level
+install(FILES
+ io.h
+ reader.h
+ schema.h
+ utils.h
+ writer.h
+ DESTINATION include/parquet/arrow)
+
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/arrow-io-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-io-test.cc b/src/parquet/arrow/arrow-io-test.cc
new file mode 100644
index 0000000..377fb97
--- /dev/null
+++ b/src/parquet/arrow/arrow-io-test.cc
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+#include <string>
+
+#include "gtest/gtest.h"
+
+#include "arrow/test-util.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+#include "parquet/api/io.h"
+#include "parquet/arrow/io.h"
+
+using arrow::default_memory_pool;
+using arrow::MemoryPool;
+using arrow::Status;
+
+// To assist with readability
+using ArrowROFile = arrow::io::RandomAccessFile;
+
+namespace parquet {
+namespace arrow {
+
+// Allocator tests
+
+TEST(TestParquetAllocator, DefaultCtor) {
+ ParquetAllocator allocator;
+
+ const int buffer_size = 10;
+
+ uint8_t* buffer = nullptr;
+ ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size););
+
+ // valgrind will complain if we write into nullptr
+ memset(buffer, 0, buffer_size);
+
+ allocator.Free(buffer, buffer_size);
+}
+
+// Pass through to the default memory pool
+class TrackingPool : public MemoryPool {
+ public:
+ TrackingPool() : pool_(default_memory_pool()), bytes_allocated_(0) {}
+
+ Status Allocate(int64_t size, uint8_t** out) override {
+ RETURN_NOT_OK(pool_->Allocate(size, out));
+ bytes_allocated_ += size;
+ return Status::OK();
+ }
+
+ void Free(uint8_t* buffer, int64_t size) override {
+ pool_->Free(buffer, size);
+ bytes_allocated_ -= size;
+ }
+
+ int64_t bytes_allocated() const override { return bytes_allocated_; }
+
+ private:
+ MemoryPool* pool_;
+ int64_t bytes_allocated_;
+};
+
+TEST(TestParquetAllocator, CustomPool) {
+ TrackingPool pool;
+
+ ParquetAllocator allocator(&pool);
+
+ ASSERT_EQ(&pool, allocator.pool());
+
+ const int buffer_size = 10;
+
+ uint8_t* buffer = nullptr;
+ ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size););
+
+ ASSERT_EQ(buffer_size, pool.bytes_allocated());
+
+ // valgrind will complain if we write into nullptr
+ memset(buffer, 0, buffer_size);
+
+ allocator.Free(buffer, buffer_size);
+
+ ASSERT_EQ(0, pool.bytes_allocated());
+}
+
+// ----------------------------------------------------------------------
+// Read source tests
+
+class BufferReader : public ArrowROFile {
+ public:
+ BufferReader(const uint8_t* buffer, int buffer_size)
+ : buffer_(buffer), buffer_size_(buffer_size), position_(0) {}
+
+ Status Close() override {
+ // no-op
+ return Status::OK();
+ }
+
+ Status Tell(int64_t* position) override {
+ *position = position_;
+ return Status::OK();
+ }
+
+ Status ReadAt(
+ int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override {
+ RETURN_NOT_OK(Seek(position));
+ return Read(nbytes, bytes_read, buffer);
+ }
+
+ Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override {
+ memcpy(buffer, buffer_ + position_, nbytes);
+ *bytes_read = std::min(nbytes, buffer_size_ - position_);
+ position_ += *bytes_read;
+ return Status::OK();
+ }
+
+ Status GetSize(int64_t* size) override {
+ *size = buffer_size_;
+ return Status::OK();
+ }
+
+ Status Seek(int64_t position) override {
+ if (position < 0 || position >= buffer_size_) {
+ return Status::IOError("position out of bounds");
+ }
+
+ position_ = position;
+ return Status::OK();
+ }
+
+ private:
+ const uint8_t* buffer_;
+ int buffer_size_;
+ int64_t position_;
+};
+
+TEST(TestParquetReadSource, Basics) {
+ std::string data = "this is the data";
+ auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str());
+
+ ParquetAllocator allocator(default_memory_pool());
+
+ auto file = std::make_shared<BufferReader>(data_buffer, data.size());
+ auto source = std::make_shared<ParquetReadSource>(&allocator);
+
+ ASSERT_OK(source->Open(file));
+
+ ASSERT_EQ(0, source->Tell());
+ ASSERT_NO_THROW(source->Seek(5));
+ ASSERT_EQ(5, source->Tell());
+ ASSERT_NO_THROW(source->Seek(0));
+
+ // Seek out of bounds
+ ASSERT_THROW(source->Seek(100), ParquetException);
+
+ uint8_t buffer[50];
+
+ ASSERT_NO_THROW(source->Read(4, buffer));
+ ASSERT_EQ(0, std::memcmp(buffer, "this", 4));
+ ASSERT_EQ(4, source->Tell());
+
+ std::shared_ptr<Buffer> pq_buffer;
+
+ ASSERT_NO_THROW(pq_buffer = source->Read(7));
+
+ auto expected_buffer = std::make_shared<Buffer>(data_buffer + 4, 7);
+
+ ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get()));
+}
+
+} // namespace arrow
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
new file mode 100644
index 0000000..e4e9efa
--- /dev/null
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -0,0 +1,502 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include "parquet/api/reader.h"
+#include "parquet/api/writer.h"
+
+#include "parquet/arrow/test-util.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+#include "arrow/test-util.h"
+#include "arrow/types/construct.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+using arrow::Array;
+using arrow::ChunkedArray;
+using arrow::default_memory_pool;
+using arrow::PoolBuffer;
+using arrow::PrimitiveArray;
+using arrow::Status;
+using arrow::Table;
+
+using ParquetBuffer = parquet::Buffer;
+using ParquetType = parquet::Type;
+using parquet::schema::GroupNode;
+using parquet::schema::NodePtr;
+using parquet::schema::PrimitiveNode;
+
+namespace parquet {
+
+namespace arrow {
+
+const int SMALL_SIZE = 100;
+const int LARGE_SIZE = 10000;
+
+template <typename TestType>
+struct test_traits {};
+
+template <>
+struct test_traits<::arrow::BooleanType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static uint8_t const value;
+};
+
+const uint8_t test_traits<::arrow::BooleanType>::value(1);
+
+template <>
+struct test_traits<::arrow::UInt8Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::UINT_8;
+ static uint8_t const value;
+};
+
+const uint8_t test_traits<::arrow::UInt8Type>::value(64);
+
+template <>
+struct test_traits<::arrow::Int8Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::INT_8;
+ static int8_t const value;
+};
+
+const int8_t test_traits<::arrow::Int8Type>::value(-64);
+
+template <>
+struct test_traits<::arrow::UInt16Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::UINT_16;
+ static uint16_t const value;
+};
+
+const uint16_t test_traits<::arrow::UInt16Type>::value(1024);
+
+template <>
+struct test_traits<::arrow::Int16Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::INT_16;
+ static int16_t const value;
+};
+
+const int16_t test_traits<::arrow::Int16Type>::value(-1024);
+
+template <>
+struct test_traits<::arrow::UInt32Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::UINT_32;
+ static uint32_t const value;
+};
+
+const uint32_t test_traits<::arrow::UInt32Type>::value(1024);
+
+template <>
+struct test_traits<::arrow::Int32Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static int32_t const value;
+};
+
+const int32_t test_traits<::arrow::Int32Type>::value(-1024);
+
+template <>
+struct test_traits<::arrow::UInt64Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+ static constexpr LogicalType::type logical_enum = LogicalType::UINT_64;
+ static uint64_t const value;
+};
+
+const uint64_t test_traits<::arrow::UInt64Type>::value(1024);
+
+template <>
+struct test_traits<::arrow::Int64Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static int64_t const value;
+};
+
+const int64_t test_traits<::arrow::Int64Type>::value(-1024);
+
+template <>
+struct test_traits<::arrow::TimestampType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+ static constexpr LogicalType::type logical_enum = LogicalType::TIMESTAMP_MILLIS;
+ static int64_t const value;
+};
+
+const int64_t test_traits<::arrow::TimestampType>::value(14695634030000);
+
+template <>
+struct test_traits<::arrow::FloatType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static float const value;
+};
+
+const float test_traits<::arrow::FloatType>::value(2.1f);
+
+template <>
+struct test_traits<::arrow::DoubleType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static double const value;
+};
+
+const double test_traits<::arrow::DoubleType>::value(4.2);
+
+template <>
+struct test_traits<::arrow::StringType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
+ static constexpr LogicalType::type logical_enum = LogicalType::UTF8;
+ static std::string const value;
+};
+
+const std::string test_traits<::arrow::StringType>::value("Test");
+
+template <typename T>
+using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
+
+template <typename T>
+using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
+
+template <typename TestType>
+class TestParquetIO : public ::testing::Test {
+ public:
+ virtual void SetUp() {}
+
+ std::shared_ptr<GroupNode> MakeSchema(Repetition::type repetition) {
+ auto pnode = PrimitiveNode::Make("column1", repetition,
+ test_traits<TestType>::parquet_enum, test_traits<TestType>::logical_enum);
+ NodePtr node_ =
+ GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
+ return std::static_pointer_cast<GroupNode>(node_);
+ }
+
+ std::unique_ptr<ParquetFileWriter> MakeWriter(
+ const std::shared_ptr<GroupNode>& schema) {
+ sink_ = std::make_shared<InMemoryOutputStream>();
+ return ParquetFileWriter::Open(sink_, schema);
+ }
+
+ std::unique_ptr<ParquetFileReader> ReaderFromSink() {
+ std::shared_ptr<ParquetBuffer> buffer = sink_->GetBuffer();
+ std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
+ return ParquetFileReader::Open(std::move(source));
+ }
+
+ void ReadSingleColumnFile(
+ std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Array>* out) {
+ FileReader reader(::arrow::default_memory_pool(), std::move(file_reader));
+ std::unique_ptr<FlatColumnReader> column_reader;
+ ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader));
+ ASSERT_NE(nullptr, column_reader.get());
+
+ ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out));
+ ASSERT_NE(nullptr, out->get());
+ }
+
+ void ReadAndCheckSingleColumnFile(::arrow::Array* values) {
+ std::shared_ptr<::arrow::Array> out;
+ ReadSingleColumnFile(ReaderFromSink(), &out);
+ ASSERT_TRUE(values->Equals(out));
+ }
+
+ void ReadTableFromFile(
+ std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Table>* out) {
+ FileReader reader(::arrow::default_memory_pool(), std::move(file_reader));
+ ASSERT_OK_NO_THROW(reader.ReadFlatTable(out));
+ ASSERT_NE(nullptr, out->get());
+ }
+
+ void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
+ std::shared_ptr<::arrow::Table> out;
+ ReadTableFromFile(ReaderFromSink(), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(values->length(), out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+ }
+
+ template <typename ArrayType>
+ void WriteFlatColumn(const std::shared_ptr<GroupNode>& schema,
+ const std::shared_ptr<ArrayType>& values) {
+ FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema));
+ ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
+ ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get()));
+ ASSERT_OK_NO_THROW(writer.Close());
+ }
+
+ std::shared_ptr<InMemoryOutputStream> sink_;
+};
+
+// We habe separate tests for UInt32Type as this is currently the only type
+// where a roundtrip does not yield the identical Array structure.
+// There we write an UInt32 Array but receive an Int64 Array as result for
+// Parquet version 1.0.
+
+typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
+ ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
+ ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType,
+ ::arrow::StringType> TestTypes;
+
+TYPED_TEST_CASE(TestParquetIO, TestTypes);
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
+ auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+ this->WriteFlatColumn(schema, values);
+
+ this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
+ auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
+ this->sink_, values->length(), default_writer_properties()));
+
+ std::shared_ptr<Table> out;
+ this->ReadTableFromFile(this->ReaderFromSink(), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(100, out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
+ // This also tests max_definition_level = 1
+ auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+ this->WriteFlatColumn(schema, values);
+
+ this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
+ // This also tests max_definition_level = 1
+ std::shared_ptr<Array> values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
+ this->sink_, values->length(), default_writer_properties()));
+
+ this->ReadAndCheckSingleColumnTable(values);
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
+ auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+ int64_t chunk_size = values->length() / 4;
+
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+ FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
+ for (int i = 0; i < 4; i++) {
+ ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+ ASSERT_OK_NO_THROW(
+ writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+ }
+ ASSERT_OK_NO_THROW(writer.Close());
+
+ this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
+ auto values = NonNullArray<TypeParam>(LARGE_SIZE);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_OK_NO_THROW(WriteFlatTable(
+ table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties()));
+
+ this->ReadAndCheckSingleColumnTable(values);
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
+ int64_t chunk_size = SMALL_SIZE / 4;
+ auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+ FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema));
+ for (int i = 0; i < 4; i++) {
+ ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+ ASSERT_OK_NO_THROW(
+ writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+ }
+ ASSERT_OK_NO_THROW(writer.Close());
+
+ this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
+ // This also tests max_definition_level = 1
+ auto values = NullableArray<TypeParam>(LARGE_SIZE, 100);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
+ this->sink_, 512, default_writer_properties()));
+
+ this->ReadAndCheckSingleColumnTable(values);
+}
+
+using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;
+
+TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
+ // This also tests max_definition_level = 1
+ std::shared_ptr<PrimitiveArray> values =
+ NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+ // Parquet 2.0 roundtrip should yield an uint32_t column again
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ std::shared_ptr<::parquet::WriterProperties> properties =
+ ::parquet::WriterProperties::Builder()
+ .version(ParquetVersion::PARQUET_2_0)
+ ->build();
+ ASSERT_OK_NO_THROW(
+ WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
+ this->ReadAndCheckSingleColumnTable(values);
+}
+
+TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
+ // This also tests max_definition_level = 1
+ std::shared_ptr<PrimitiveArray> values =
+ NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+ // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0
+ // reader that a column is unsigned.
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ std::shared_ptr<::parquet::WriterProperties> properties =
+ ::parquet::WriterProperties::Builder()
+ .version(ParquetVersion::PARQUET_1_0)
+ ->build();
+ ASSERT_OK_NO_THROW(WriteFlatTable(
+ table.get(), ::arrow::default_memory_pool(), this->sink_, 512, properties));
+
+ std::shared_ptr<Array> expected_values;
+ std::shared_ptr<PoolBuffer> int64_data =
+ std::make_shared<PoolBuffer>(::arrow::default_memory_pool());
+ {
+ ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
+ int64_t* int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
+ const uint32_t* uint32_data_ptr =
+ reinterpret_cast<const uint32_t*>(values->data()->data());
+ // std::copy might be faster but this is explicit on the casts)
+ for (int64_t i = 0; i < values->length(); i++) {
+ int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]);
+ }
+ }
+ ASSERT_OK(MakePrimitiveArray(std::make_shared<::arrow::Int64Type>(), values->length(),
+ int64_data, values->null_count(), values->null_bitmap(), &expected_values));
+ this->ReadAndCheckSingleColumnTable(expected_values);
+}
+
+template <typename T>
+using ParquetCDataType = typename ParquetDataType<T>::c_type;
+
+template <typename TestType>
+class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
+ public:
+ typedef typename TestType::c_type T;
+
+ void MakeTestFile(std::vector<T>& values, int num_chunks,
+ std::unique_ptr<ParquetFileReader>* file_reader) {
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+ std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema);
+ size_t chunk_size = values.size() / num_chunks;
+ // Convert to Parquet's expected physical type
+ std::vector<uint8_t> values_buffer(
+ sizeof(ParquetCDataType<TestType>) * values.size());
+ auto values_parquet =
+ reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data());
+ std::copy(values.cbegin(), values.cend(), values_parquet);
+ for (int i = 0; i < num_chunks; i++) {
+ auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
+ auto column_writer =
+ static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
+ ParquetCDataType<TestType>* data = values_parquet + i * chunk_size;
+ column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
+ column_writer->Close();
+ row_group_writer->Close();
+ }
+ file_writer->Close();
+ *file_reader = this->ReaderFromSink();
+ }
+
+ void CheckSingleColumnRequiredTableRead(int num_chunks) {
+ std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+ std::unique_ptr<ParquetFileReader> file_reader;
+ ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader));
+
+ std::shared_ptr<Table> out;
+ this->ReadTableFromFile(std::move(file_reader), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(SMALL_SIZE, out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get());
+ }
+
+ void CheckSingleColumnRequiredRead(int num_chunks) {
+ std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+ std::unique_ptr<ParquetFileReader> file_reader;
+ ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader));
+
+ std::shared_ptr<Array> out;
+ this->ReadSingleColumnFile(std::move(file_reader), &out);
+
+ ExpectArray<TestType>(values.data(), out.get());
+ }
+};
+
+typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
+ ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, ::arrow::Int32Type,
+ ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType,
+ ::arrow::DoubleType> PrimitiveTestTypes;
+
+TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes);
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) {
+ this->CheckSingleColumnRequiredRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) {
+ this->CheckSingleColumnRequiredTableRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) {
+ this->CheckSingleColumnRequiredRead(4);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
+ this->CheckSingleColumnRequiredTableRead(4);
+}
+
+} // namespace arrow
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/arrow-schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
new file mode 100644
index 0000000..3dfaf14
--- /dev/null
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -0,0 +1,265 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/schema.h"
+
+#include "arrow/test-util.h"
+#include "arrow/type.h"
+#include "arrow/types/datetime.h"
+#include "arrow/types/decimal.h"
+#include "arrow/util/status.h"
+
+using arrow::Field;
+
+using ParquetType = parquet::Type;
+using parquet::LogicalType;
+using parquet::Repetition;
+using parquet::schema::NodePtr;
+using parquet::schema::GroupNode;
+using parquet::schema::PrimitiveNode;
+
+namespace parquet {
+
+namespace arrow {
+
+const auto BOOL = std::make_shared<::arrow::BooleanType>();
+const auto UINT8 = std::make_shared<::arrow::UInt8Type>();
+const auto INT32 = std::make_shared<::arrow::Int32Type>();
+const auto INT64 = std::make_shared<::arrow::Int64Type>();
+const auto FLOAT = std::make_shared<::arrow::FloatType>();
+const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
+const auto UTF8 = std::make_shared<::arrow::StringType>();
+const auto TIMESTAMP_MS =
+ std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
+// TODO: This requires parquet-cpp implementing the MICROS enum value
+// const auto TIMESTAMP_US = std::make_shared<TimestampType>(TimestampType::Unit::MICRO);
+const auto BINARY =
+ std::make_shared<::arrow::ListType>(std::make_shared<Field>("", UINT8));
+const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4);
+
+class TestConvertParquetSchema : public ::testing::Test {
+ public:
+ virtual void SetUp() {}
+
+ void CheckFlatSchema(const std::shared_ptr<::arrow::Schema>& expected_schema) {
+ ASSERT_EQ(expected_schema->num_fields(), result_schema_->num_fields());
+ for (int i = 0; i < expected_schema->num_fields(); ++i) {
+ auto lhs = result_schema_->field(i);
+ auto rhs = expected_schema->field(i);
+ EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
+ << " != " << rhs->ToString();
+ }
+ }
+
+ ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes) {
+ NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
+ descr_.Init(schema);
+ return FromParquetSchema(&descr_, &result_schema_);
+ }
+
+ protected:
+ SchemaDescriptor descr_;
+ std::shared_ptr<::arrow::Schema> result_schema_;
+};
+
+TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
+ std::vector<NodePtr> parquet_fields;
+ std::vector<std::shared_ptr<Field>> arrow_fields;
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN));
+ arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
+ arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
+ arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
+
+ parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
+ ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
+ arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
+
+ // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
+ // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS));
+ // arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, false));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
+ arrow_fields.push_back(std::make_shared<Field>("float", FLOAT));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
+ arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY));
+ arrow_fields.push_back(std::make_shared<Field>("binary", BINARY));
+
+ parquet_fields.push_back(PrimitiveNode::Make(
+ "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8));
+ arrow_fields.push_back(std::make_shared<Field>("string", UTF8));
+
+ parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL,
+ ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, 12));
+ arrow_fields.push_back(std::make_shared<Field>("flba-binary", BINARY));
+
+ auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+ ASSERT_OK(ConvertSchema(parquet_fields));
+
+ CheckFlatSchema(arrow_schema);
+}
+
+TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) {
+ std::vector<NodePtr> parquet_fields;
+ std::vector<std::shared_ptr<Field>> arrow_fields;
+
+ parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL,
+ ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 4, 8, 4));
+ arrow_fields.push_back(std::make_shared<Field>("flba-decimal", DECIMAL_8_4));
+
+ parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL,
+ ParquetType::BYTE_ARRAY, LogicalType::DECIMAL, -1, 8, 4));
+ arrow_fields.push_back(std::make_shared<Field>("binary-decimal", DECIMAL_8_4));
+
+ parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL,
+ ParquetType::INT32, LogicalType::DECIMAL, -1, 8, 4));
+ arrow_fields.push_back(std::make_shared<Field>("int32-decimal", DECIMAL_8_4));
+
+ parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL,
+ ParquetType::INT64, LogicalType::DECIMAL, -1, 8, 4));
+ arrow_fields.push_back(std::make_shared<Field>("int64-decimal", DECIMAL_8_4));
+
+ auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+ ASSERT_OK(ConvertSchema(parquet_fields));
+
+ CheckFlatSchema(arrow_schema);
+}
+
+TEST_F(TestConvertParquetSchema, UnsupportedThings) {
+ std::vector<NodePtr> unsupported_nodes;
+
+ unsupported_nodes.push_back(
+ PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96));
+
+ unsupported_nodes.push_back(
+ GroupNode::Make("repeated-group", Repetition::REPEATED, {}));
+
+ unsupported_nodes.push_back(PrimitiveNode::Make(
+ "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE));
+
+ for (const NodePtr& node : unsupported_nodes) {
+ ASSERT_RAISES(NotImplemented, ConvertSchema({node}));
+ }
+}
+
+class TestConvertArrowSchema : public ::testing::Test {
+ public:
+ virtual void SetUp() {}
+
+ void CheckFlatSchema(const std::vector<NodePtr>& nodes) {
+ NodePtr schema_node = GroupNode::Make("schema", Repetition::REPEATED, nodes);
+ const GroupNode* expected_schema_node =
+ static_cast<const GroupNode*>(schema_node.get());
+ const GroupNode* result_schema_node = result_schema_->group_node();
+
+ ASSERT_EQ(expected_schema_node->field_count(), result_schema_node->field_count());
+
+ for (int i = 0; i < expected_schema_node->field_count(); i++) {
+ auto lhs = result_schema_node->field(i);
+ auto rhs = expected_schema_node->field(i);
+ EXPECT_TRUE(lhs->Equals(rhs.get()));
+ }
+ }
+
+ ::arrow::Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) {
+ arrow_schema_ = std::make_shared<::arrow::Schema>(fields);
+ std::shared_ptr<::parquet::WriterProperties> properties =
+ ::parquet::default_writer_properties();
+ return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_);
+ }
+
+ protected:
+ std::shared_ptr<::arrow::Schema> arrow_schema_;
+ std::shared_ptr<SchemaDescriptor> result_schema_;
+};
+
+TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
+ std::vector<NodePtr> parquet_fields;
+ std::vector<std::shared_ptr<Field>> arrow_fields;
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN));
+ arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
+ arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
+ arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
+
+ parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
+ ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
+ arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
+
+ // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
+ // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS));
+ // arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, false));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
+ arrow_fields.push_back(std::make_shared<Field>("float", FLOAT));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
+ arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));
+
+ // TODO: String types need to be clarified a bit more in the Arrow spec
+ parquet_fields.push_back(PrimitiveNode::Make(
+ "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8));
+ arrow_fields.push_back(std::make_shared<Field>("string", UTF8));
+
+ ASSERT_OK(ConvertSchema(arrow_fields));
+
+ CheckFlatSchema(parquet_fields);
+}
+
+TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) {
+ std::vector<NodePtr> parquet_fields;
+ std::vector<std::shared_ptr<Field>> arrow_fields;
+
+ // TODO: Test Decimal Arrow -> Parquet conversion
+
+ ASSERT_OK(ConvertSchema(arrow_fields));
+
+ CheckFlatSchema(parquet_fields);
+}
+
+TEST(TestNodeConversion, DateAndTime) {}
+
+} // namespace arrow
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/io.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/io.cc b/src/parquet/arrow/io.cc
new file mode 100644
index 0000000..8e2645a
--- /dev/null
+++ b/src/parquet/arrow/io.cc
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/io.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/api/io.h"
+#include "parquet/arrow/utils.h"
+
+#include "arrow/util/status.h"
+
+using arrow::Status;
+using arrow::MemoryPool;
+
+// To assist with readability
+using ArrowROFile = arrow::io::RandomAccessFile;
+
+namespace parquet {
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// ParquetAllocator
+
+ParquetAllocator::ParquetAllocator() : pool_(::arrow::default_memory_pool()) {}
+
+ParquetAllocator::ParquetAllocator(MemoryPool* pool) : pool_(pool) {}
+
+ParquetAllocator::~ParquetAllocator() {}
+
+uint8_t* ParquetAllocator::Malloc(int64_t size) {
+ uint8_t* result;
+ PARQUET_THROW_NOT_OK(pool_->Allocate(size, &result));
+ return result;
+}
+
+void ParquetAllocator::Free(uint8_t* buffer, int64_t size) {
+ // Does not report Status
+ pool_->Free(buffer, size);
+}
+
+// ----------------------------------------------------------------------
+// ParquetReadSource
+
+ParquetReadSource::ParquetReadSource(ParquetAllocator* allocator)
+ : file_(nullptr), allocator_(allocator) {}
+
+Status ParquetReadSource::Open(const std::shared_ptr<ArrowROFile>& file) {
+ int64_t file_size;
+ RETURN_NOT_OK(file->GetSize(&file_size));
+
+ file_ = file;
+ size_ = file_size;
+ return Status::OK();
+}
+
+void ParquetReadSource::Close() {
+ // TODO(wesm): Make this a no-op for now. This leaves Python wrappers for
+ // these classes in a borked state. Probably better to explicitly close.
+
+ // PARQUET_THROW_NOT_OK(file_->Close());
+}
+
+int64_t ParquetReadSource::Tell() const {
+ int64_t position;
+ PARQUET_THROW_NOT_OK(file_->Tell(&position));
+ return position;
+}
+
+void ParquetReadSource::Seek(int64_t position) {
+ PARQUET_THROW_NOT_OK(file_->Seek(position));
+}
+
+int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) {
+ int64_t bytes_read;
+ PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out));
+ return bytes_read;
+}
+
+std::shared_ptr<Buffer> ParquetReadSource::Read(int64_t nbytes) {
+ // TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests
+ // that there should be more code sharing amongst file-like sources
+ auto result = std::make_shared<OwnedMutableBuffer>(0, allocator_);
+ result->Resize(nbytes);
+
+ int64_t bytes_read = Read(nbytes, result->mutable_data());
+ if (bytes_read < nbytes) { result->Resize(bytes_read); }
+ return result;
+}
+
+} // namespace arrow
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/io.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/io.h b/src/parquet/arrow/io.h
new file mode 100644
index 0000000..dc60635
--- /dev/null
+++ b/src/parquet/arrow/io.h
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Bridges Arrow's IO interfaces and Parquet-cpp's IO interfaces
+
+#ifndef PARQUET_ARROW_IO_H
+#define PARQUET_ARROW_IO_H
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/api/io.h"
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/memory-pool.h"
+
+namespace parquet {
+
+namespace arrow {
+
+// An implementation of the Parquet MemoryAllocator API that plugs into an
+// existing Arrow memory pool. This way we can direct all allocations to a
+// single place rather than tracking allocations in different locations (for
+// example: without utilizing parquet-cpp's default allocator)
+class PARQUET_EXPORT ParquetAllocator : public MemoryAllocator {
+ public:
+ // Uses the default memory pool
+ ParquetAllocator();
+
+ explicit ParquetAllocator(::arrow::MemoryPool* pool);
+ virtual ~ParquetAllocator();
+
+ uint8_t* Malloc(int64_t size) override;
+ void Free(uint8_t* buffer, int64_t size) override;
+
+ void set_pool(::arrow::MemoryPool* pool) { pool_ = pool; }
+
+ ::arrow::MemoryPool* pool() const { return pool_; }
+
+ private:
+ ::arrow::MemoryPool* pool_;
+};
+
+class PARQUET_EXPORT ParquetReadSource : public RandomAccessSource {
+ public:
+ explicit ParquetReadSource(ParquetAllocator* allocator);
+
+ // We need to ask for the file size on opening the file, and this can fail
+ ::arrow::Status Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& file);
+
+ void Close() override;
+ int64_t Tell() const override;
+ void Seek(int64_t pos) override;
+ int64_t Read(int64_t nbytes, uint8_t* out) override;
+ std::shared_ptr<Buffer> Read(int64_t nbytes) override;
+
+ private:
+ // An Arrow readable file of some kind
+ std::shared_ptr<::arrow::io::RandomAccessFile> file_;
+
+ // The allocator is required for creating managed buffers
+ ParquetAllocator* allocator_;
+};
+
+} // namespace arrow
+} // namespace parquet
+
+#endif // PARQUET_ARROW_IO_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
new file mode 100644
index 0000000..056b5ab
--- /dev/null
+++ b/src/parquet/arrow/reader.cc
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/reader.h"
+
+#include <algorithm>
+#include <queue>
+#include <string>
+#include <vector>
+
+#include "parquet/arrow/io.h"
+#include "parquet/arrow/schema.h"
+#include "parquet/arrow/utils.h"
+
+#include "arrow/column.h"
+#include "arrow/schema.h"
+#include "arrow/table.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/util/status.h"
+
+using arrow::Array;
+using arrow::Column;
+using arrow::Field;
+using arrow::MemoryPool;
+using arrow::PoolBuffer;
+using arrow::Status;
+using arrow::Table;
+
+// Help reduce verbosity
+using ParquetRAS = parquet::RandomAccessSource;
+using ParquetReader = parquet::ParquetFileReader;
+
+namespace parquet {
+namespace arrow {
+
+template <typename ArrowType>
+struct ArrowTypeTraits {
+ typedef ::arrow::NumericBuilder<ArrowType> builder_type;
+};
+
+template <>
+struct ArrowTypeTraits<BooleanType> {
+ typedef ::arrow::BooleanBuilder builder_type;
+};
+
+template <typename ArrowType>
+using BuilderType = typename ArrowTypeTraits<ArrowType>::builder_type;
+
+class FileReader::Impl {
+ public:
+ Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader);
+ virtual ~Impl() {}
+
+ bool CheckForFlatColumn(const ColumnDescriptor* descr);
+ Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
+ Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);
+ Status ReadFlatTable(std::shared_ptr<Table>* out);
+
+ private:
+ MemoryPool* pool_;
+ std::unique_ptr<ParquetFileReader> reader_;
+};
+
+class FlatColumnReader::Impl {
+ public:
+ Impl(MemoryPool* pool, const ColumnDescriptor* descr,
+ ParquetFileReader* reader, int column_index);
+ virtual ~Impl() {}
+
+ Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
+ template <typename ArrowType, typename ParquetType>
+ Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
+
+ template <typename ArrowType, typename ParquetType>
+ Status ReadNullableFlatBatch(const int16_t* def_levels,
+ typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
+ BuilderType<ArrowType>* builder);
+ template <typename ArrowType, typename ParquetType>
+ Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read,
+ BuilderType<ArrowType>* builder);
+
+ private:
+ void NextRowGroup();
+
+ template <typename InType, typename OutType>
+ struct can_copy_ptr {
+ static constexpr bool value =
+ std::is_same<InType, OutType>::value ||
+ (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
+ (sizeof(InType) == sizeof(OutType)));
+ };
+
+ template <typename InType, typename OutType,
+ typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+ Status ConvertPhysicalType(
+ const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+ *out_ptr = reinterpret_cast<const OutType*>(in_ptr);
+ return Status::OK();
+ }
+
+ template <typename InType, typename OutType,
+ typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+ Status ConvertPhysicalType(
+ const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+ RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType)));
+ OutType* mutable_out_ptr =
+ reinterpret_cast<OutType*>(values_builder_buffer_.mutable_data());
+ std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
+ *out_ptr = mutable_out_ptr;
+ return Status::OK();
+ }
+
+ MemoryPool* pool_;
+ const ColumnDescriptor* descr_;
+ ParquetFileReader* reader_;
+ int column_index_;
+ int next_row_group_;
+ std::shared_ptr<ColumnReader> column_reader_;
+ std::shared_ptr<Field> field_;
+
+ PoolBuffer values_buffer_;
+ PoolBuffer def_levels_buffer_;
+ PoolBuffer values_builder_buffer_;
+ PoolBuffer valid_bytes_buffer_;
+};
+
+FileReader::Impl::Impl(
+ MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+ : pool_(pool), reader_(std::move(reader)) {}
+
+bool FileReader::Impl::CheckForFlatColumn(const ColumnDescriptor* descr) {
+ if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) {
+ return false;
+ } else if ((descr->max_definition_level() == 1) &&
+ (descr->schema_node()->repetition() != Repetition::OPTIONAL)) {
+ return false;
+ }
+ return true;
+}
+
+Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
+ const SchemaDescriptor* schema = reader_->metadata()->schema();
+
+ if (!CheckForFlatColumn(schema->Column(i))) {
+ return Status::Invalid("The requested column is not flat");
+ }
+ std::unique_ptr<FlatColumnReader::Impl> impl(
+ new FlatColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i));
+ *out = std::unique_ptr<FlatColumnReader>(new FlatColumnReader(std::move(impl)));
+ return Status::OK();
+}
+
+Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
+ std::unique_ptr<FlatColumnReader> flat_column_reader;
+ RETURN_NOT_OK(GetFlatColumn(i, &flat_column_reader));
+ return flat_column_reader->NextBatch(reader_->metadata()->num_rows(), out);
+}
+
+Status FileReader::Impl::ReadFlatTable(std::shared_ptr<Table>* table) {
+ auto descr = reader_->metadata()->schema();
+
+ const std::string& name = descr->name();
+ std::shared_ptr<::arrow::Schema> schema;
+ RETURN_NOT_OK(FromParquetSchema(descr, &schema));
+
+ int num_columns = reader_->metadata()->num_columns();
+
+ std::vector<std::shared_ptr<Column>> columns(num_columns);
+ for (int i = 0; i < num_columns; i++) {
+ std::shared_ptr<Array> array;
+ RETURN_NOT_OK(ReadFlatColumn(i, &array));
+ columns[i] = std::make_shared<Column>(schema->field(i), array);
+ }
+
+ *table = std::make_shared<Table>(name, schema, columns);
+ return Status::OK();
+}
+
+FileReader::FileReader(
+ MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+ : impl_(new FileReader::Impl(pool, std::move(reader))) {}
+
+FileReader::~FileReader() {}
+
+// Static ctor
+Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
+ ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader) {
+ std::unique_ptr<ParquetReadSource> source(new ParquetReadSource(allocator));
+ RETURN_NOT_OK(source->Open(file));
+
+ // TODO(wesm): reader properties
+ std::unique_ptr<ParquetReader> pq_reader;
+ PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(std::move(source)));
+
+ // Use the same memory pool as the ParquetAllocator
+ reader->reset(new FileReader(allocator->pool(), std::move(pq_reader)));
+ return Status::OK();
+}
+
+Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
+ return impl_->GetFlatColumn(i, out);
+}
+
+Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
+ return impl_->ReadFlatColumn(i, out);
+}
+
+Status FileReader::ReadFlatTable(std::shared_ptr<Table>* out) {
+ return impl_->ReadFlatTable(out);
+}
+
+FlatColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr,
+ ParquetFileReader* reader, int column_index)
+ : pool_(pool),
+ descr_(descr),
+ reader_(reader),
+ column_index_(column_index),
+ next_row_group_(0),
+ values_buffer_(pool),
+ def_levels_buffer_(pool) {
+ NodeToField(descr_->schema_node(), &field_);
+ NextRowGroup();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::ReadNonNullableBatch(typename ParquetType::c_type* values,
+ int64_t values_read, BuilderType<ArrowType>* builder) {
+ using ArrowCType = typename ArrowType::c_type;
+ using ParquetCType = typename ParquetType::c_type;
+
+ DCHECK(builder);
+ const ArrowCType* values_ptr = nullptr;
+ RETURN_NOT_OK(
+ (ConvertPhysicalType<ParquetCType, ArrowCType>(values, values_read, &values_ptr)));
+ RETURN_NOT_OK(builder->Append(values_ptr, values_read));
+ return Status::OK();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
+ typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
+ BuilderType<ArrowType>* builder) {
+ using ArrowCType = typename ArrowType::c_type;
+
+ DCHECK(builder);
+ RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * sizeof(ArrowCType)));
+ RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
+ auto values_ptr = reinterpret_cast<ArrowCType*>(values_builder_buffer_.mutable_data());
+ uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
+ int values_idx = 0;
+ for (int64_t i = 0; i < levels_read; i++) {
+ if (def_levels[i] < descr_->max_definition_level()) {
+ valid_bytes[i] = 0;
+ } else {
+ valid_bytes[i] = 1;
+ values_ptr[i] = values[values_idx++];
+ }
+ }
+ RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes));
+ return Status::OK();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::TypedReadBatch(
+ int batch_size, std::shared_ptr<Array>* out) {
+ using ParquetCType = typename ParquetType::c_type;
+
+ int values_to_read = batch_size;
+ BuilderType<ArrowType> builder(pool_, field_->type);
+ while ((values_to_read > 0) && column_reader_) {
+ values_buffer_.Resize(values_to_read * sizeof(ParquetCType));
+ if (descr_->max_definition_level() > 0) {
+ def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
+ }
+ auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
+ int64_t values_read;
+ int64_t levels_read;
+ int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
+ PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+ values_to_read, def_levels, nullptr, values, &values_read));
+ values_to_read -= levels_read;
+ if (descr_->max_definition_level() == 0) {
+ RETURN_NOT_OK(
+ (ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read, &builder)));
+ } else {
+ // As per the defintion and checks for flat columns:
+ // descr_->max_definition_level() == 1
+ RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>(
+ def_levels, values, values_read, levels_read, &builder)));
+ }
+ if (!column_reader_->HasNext()) { NextRowGroup(); }
+ }
+ *out = builder.Finish();
+ return Status::OK();
+}
+
+template <>
+Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
+ int batch_size, std::shared_ptr<Array>* out) {
+ int values_to_read = batch_size;
+ ::arrow::StringBuilder builder(pool_, field_->type);
+ while ((values_to_read > 0) && column_reader_) {
+ values_buffer_.Resize(values_to_read * sizeof(ByteArray));
+ if (descr_->max_definition_level() > 0) {
+ def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
+ }
+ auto reader =
+ dynamic_cast<TypedColumnReader<ByteArrayType>*>(column_reader_.get());
+ int64_t values_read;
+ int64_t levels_read;
+ int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ auto values = reinterpret_cast<ByteArray*>(values_buffer_.mutable_data());
+ PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+ values_to_read, def_levels, nullptr, values, &values_read));
+ values_to_read -= levels_read;
+ if (descr_->max_definition_level() == 0) {
+ for (int64_t i = 0; i < levels_read; i++) {
+ RETURN_NOT_OK(
+ builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len));
+ }
+ } else {
+ // descr_->max_definition_level() == 1
+ int values_idx = 0;
+ for (int64_t i = 0; i < levels_read; i++) {
+ if (def_levels[i] < descr_->max_definition_level()) {
+ RETURN_NOT_OK(builder.AppendNull());
+ } else {
+ RETURN_NOT_OK(
+ builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr),
+ values[values_idx].len));
+ values_idx++;
+ }
+ }
+ }
+ if (!column_reader_->HasNext()) { NextRowGroup(); }
+ }
+ *out = builder.Finish();
+ return Status::OK();
+}
+
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
+ case ::arrow::Type::ENUM: \
+ return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
+ break;
+
+Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+ if (!column_reader_) {
+ // Exhausted all row groups.
+ *out = nullptr;
+ return Status::OK();
+ }
+
+ switch (field_->type->type) {
+ TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
+ TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
+ TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
+ TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
+ TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
+ TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
+ TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
+ TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
+ TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
+ TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
+ TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
+ TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
+ TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type)
+ default:
+ return Status::NotImplemented(field_->type->ToString());
+ }
+}
+
+void FlatColumnReader::Impl::NextRowGroup() {
+ if (next_row_group_ < reader_->metadata()->num_row_groups()) {
+ column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_);
+ next_row_group_++;
+ } else {
+ column_reader_ = nullptr;
+ }
+}
+
+FlatColumnReader::FlatColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
+
+FlatColumnReader::~FlatColumnReader() {}
+
+Status FlatColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+ return impl_->NextBatch(batch_size, out);
+}
+
+} // namespace arrow
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
new file mode 100644
index 0000000..e725728
--- /dev/null
+++ b/src/parquet/arrow/reader.h
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_ARROW_READER_H
+#define PARQUET_ARROW_READER_H
+
+#include <memory>
+
+#include "parquet/api/reader.h"
+#include "parquet/api/schema.h"
+#include "parquet/arrow/io.h"
+
+#include "arrow/io/interfaces.h"
+
+namespace arrow {
+
+class Array;
+class MemoryPool;
+class RowBatch;
+class Status;
+class Table;
+}
+
+namespace parquet {
+
+namespace arrow {
+
+class FlatColumnReader;
+
+// Arrow read adapter class for deserializing Parquet files as Arrow row
+// batches.
+//
+// TODO(wesm): nested data does not always make sense with this user
+// interface unless you are only reading a single leaf node from a branch of
+// a table. For example:
+//
+// repeated group data {
+// optional group record {
+// optional int32 val1;
+// optional byte_array val2;
+// optional bool val3;
+// }
+// optional int32 val4;
+// }
+//
+// In the Parquet file, there are 3 leaf nodes:
+//
+// * data.record.val1
+// * data.record.val2
+// * data.record.val3
+// * data.val4
+//
+// When materializing this data in an Arrow array, we would have:
+//
+// data: list<struct<
+// record: struct<
+// val1: int32,
+// val2: string (= list<uint8>),
+// val3: bool,
+// >,
+// val4: int32
+// >>
+//
+// However, in the Parquet format, each leaf node has its own repetition and
+// definition levels describing the structure of the intermediate nodes in
+// this array structure. Thus, we will need to scan the leaf data for a group
+// of leaf nodes part of the same type tree to create a single result Arrow
+// nested array structure.
+//
+// This is additionally complicated "chunky" repeated fields or very large byte
+// arrays
+class PARQUET_EXPORT FileReader {
+ public:
+ FileReader(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader);
+
+ // Since the distribution of columns amongst a Parquet file's row groups may
+ // be uneven (the number of values in each column chunk can be different), we
+ // provide a column-oriented read interface. The ColumnReader hides the
+ // details of paging through the file's row groups and yielding
+ // fully-materialized arrow::Array instances
+ //
+ // Returns error status if the column of interest is not flat.
+ ::arrow::Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
+ // Read column as a whole into an Array.
+ ::arrow::Status ReadFlatColumn(int i, std::shared_ptr<::arrow::Array>* out);
+ // Read a table of flat columns into a Table.
+ ::arrow::Status ReadFlatTable(std::shared_ptr<::arrow::Table>* out);
+
+ virtual ~FileReader();
+
+ private:
+ class PARQUET_NO_EXPORT Impl;
+ std::unique_ptr<Impl> impl_;
+};
+
+// At this point, the column reader is a stream iterator. It only knows how to
+// read the next batch of values for a particular column from the file until it
+// runs out.
+//
+// We also do not expose any internal Parquet details, such as row groups. This
+// might change in the future.
+class PARQUET_EXPORT FlatColumnReader {
+ public:
+ virtual ~FlatColumnReader();
+
+ // Scan the next array of the indicated size. The actual size of the
+ // returned array may be less than the passed size depending how much data is
+ // available in the file.
+ //
+ // When all the data in the file has been exhausted, the result is set to
+ // nullptr.
+ //
+ // Returns Status::OK on a successful read, including if you have exhausted
+ // the data available in the file.
+ ::arrow::Status NextBatch(int batch_size, std::shared_ptr<::arrow::Array>* out);
+
+ private:
+ class PARQUET_NO_EXPORT Impl;
+ std::unique_ptr<Impl> impl_;
+ explicit FlatColumnReader(std::unique_ptr<Impl> impl);
+
+ friend class FileReader;
+};
+
+// Helper function to create a file reader from an implementation of an Arrow
+// readable file
+PARQUET_EXPORT
+::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
+ ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader);
+
+} // namespace arrow
+} // namespace parquet
+
+#endif // PARQUET_ARROW_READER_H