You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/09/18 18:01:49 UTC

[1/2] parquet-cpp git commit: PARQUET-712: Add library to read into Arrow memory

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 8ef68b17a -> 4a7bf1174


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
new file mode 100644
index 0000000..5a38a28
--- /dev/null
+++ b/src/parquet/arrow/schema.cc
@@ -0,0 +1,351 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/schema.h"
+
+#include <string>
+#include <vector>
+
+#include "parquet/api/schema.h"
+#include "parquet/arrow/utils.h"
+
+#include "arrow/types/decimal.h"
+#include "arrow/types/string.h"
+#include "arrow/util/status.h"
+
+using arrow::Field;
+using arrow::Status;
+using arrow::TypePtr;
+
+using ArrowType = arrow::Type;
+
+using parquet::Repetition;
+using parquet::schema::Node;
+using parquet::schema::NodePtr;
+using parquet::schema::GroupNode;
+using parquet::schema::PrimitiveNode;
+
+using ParquetType = parquet::Type;
+using parquet::LogicalType;
+
+namespace parquet {
+
+namespace arrow {
+
+const auto BOOL = std::make_shared<::arrow::BooleanType>();
+const auto UINT8 = std::make_shared<::arrow::UInt8Type>();
+const auto INT8 = std::make_shared<::arrow::Int8Type>();
+const auto UINT16 = std::make_shared<::arrow::UInt16Type>();
+const auto INT16 = std::make_shared<::arrow::Int16Type>();
+const auto UINT32 = std::make_shared<::arrow::UInt32Type>();
+const auto INT32 = std::make_shared<::arrow::Int32Type>();
+const auto UINT64 = std::make_shared<::arrow::UInt64Type>();
+const auto INT64 = std::make_shared<::arrow::Int64Type>();
+const auto FLOAT = std::make_shared<::arrow::FloatType>();
+const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
+const auto UTF8 = std::make_shared<::arrow::StringType>();
+const auto TIMESTAMP_MS =
+    std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
+const auto BINARY =
+    std::make_shared<::arrow::ListType>(std::make_shared<::arrow::Field>("", UINT8));
+
+TypePtr MakeDecimalType(const PrimitiveNode* node) {
+  int precision = node->decimal_metadata().precision;
+  int scale = node->decimal_metadata().scale;
+  return std::make_shared<::arrow::DecimalType>(precision, scale);
+}
+
+static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) {
+  switch (node->logical_type()) {
+    case LogicalType::UTF8:
+      *out = UTF8;
+      break;
+    case LogicalType::DECIMAL:
+      *out = MakeDecimalType(node);
+      break;
+    default:
+      // BINARY
+      *out = BINARY;
+      break;
+  }
+  return Status::OK();
+}
+
+static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
+  switch (node->logical_type()) {
+    case LogicalType::NONE:
+      *out = BINARY;
+      break;
+    case LogicalType::DECIMAL:
+      *out = MakeDecimalType(node);
+      break;
+    default:
+      return Status::NotImplemented("unhandled type");
+      break;
+  }
+
+  return Status::OK();
+}
+
+static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
+  switch (node->logical_type()) {
+    case LogicalType::NONE:
+      *out = INT32;
+      break;
+    case LogicalType::UINT_8:
+      *out = UINT8;
+      break;
+    case LogicalType::INT_8:
+      *out = INT8;
+      break;
+    case LogicalType::UINT_16:
+      *out = UINT16;
+      break;
+    case LogicalType::INT_16:
+      *out = INT16;
+      break;
+    case LogicalType::UINT_32:
+      *out = UINT32;
+      break;
+    case LogicalType::DECIMAL:
+      *out = MakeDecimalType(node);
+      break;
+    default:
+      return Status::NotImplemented("Unhandled logical type for int32");
+      break;
+  }
+  return Status::OK();
+}
+
+static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
+  switch (node->logical_type()) {
+    case LogicalType::NONE:
+      *out = INT64;
+      break;
+    case LogicalType::UINT_64:
+      *out = UINT64;
+      break;
+    case LogicalType::DECIMAL:
+      *out = MakeDecimalType(node);
+      break;
+    case LogicalType::TIMESTAMP_MILLIS:
+      *out = TIMESTAMP_MS;
+      break;
+    default:
+      return Status::NotImplemented("Unhandled logical type for int64");
+      break;
+  }
+  return Status::OK();
+}
+
+// TODO: Logical Type Handling
+Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) {
+  std::shared_ptr<::arrow::DataType> type;
+
+  if (node->is_repeated()) {
+    return Status::NotImplemented("No support yet for repeated node types");
+  }
+
+  if (node->is_group()) {
+    const GroupNode* group = static_cast<const GroupNode*>(node.get());
+    std::vector<std::shared_ptr<Field>> fields(group->field_count());
+    for (int i = 0; i < group->field_count(); i++) {
+      RETURN_NOT_OK(NodeToField(group->field(i), &fields[i]));
+    }
+    type = std::make_shared<::arrow::StructType>(fields);
+  } else {
+    // Primitive (leaf) node
+    const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
+
+    switch (primitive->physical_type()) {
+      case ParquetType::BOOLEAN:
+        type = BOOL;
+        break;
+      case ParquetType::INT32:
+        RETURN_NOT_OK(FromInt32(primitive, &type));
+        break;
+      case ParquetType::INT64:
+        RETURN_NOT_OK(FromInt64(primitive, &type));
+        break;
+      case ParquetType::INT96:
+        // TODO: Do we have that type in Arrow?
+        // type = TypePtr(new Int96Type());
+        return Status::NotImplemented("int96");
+      case ParquetType::FLOAT:
+        type = FLOAT;
+        break;
+      case ParquetType::DOUBLE:
+        type = DOUBLE;
+        break;
+      case ParquetType::BYTE_ARRAY:
+        // TODO: Do we have that type in Arrow?
+        RETURN_NOT_OK(FromByteArray(primitive, &type));
+        break;
+      case ParquetType::FIXED_LEN_BYTE_ARRAY:
+        RETURN_NOT_OK(FromFLBA(primitive, &type));
+        break;
+    }
+  }
+
+  *out = std::make_shared<Field>(node->name(), type, !node->is_required());
+  return Status::OK();
+}
+
+Status FromParquetSchema(
+    const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out) {
+  // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes
+  // from the root Parquet node
+  const GroupNode* schema_node = parquet_schema->group_node();
+
+  std::vector<std::shared_ptr<Field>> fields(schema_node->field_count());
+  for (int i = 0; i < schema_node->field_count(); i++) {
+    RETURN_NOT_OK(NodeToField(schema_node->field(i), &fields[i]));
+  }
+
+  *out = std::make_shared<::arrow::Schema>(fields);
+  return Status::OK();
+}
+
+Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
+    const std::string& name, bool nullable, const WriterProperties& properties,
+    NodePtr* out) {
+  Repetition::type repetition = Repetition::REQUIRED;
+  if (nullable) { repetition = Repetition::OPTIONAL; }
+
+  std::vector<NodePtr> children(type->num_children());
+  for (int i = 0; i < type->num_children(); i++) {
+    RETURN_NOT_OK(FieldToNode(type->child(i), properties, &children[i]));
+  }
+
+  *out = GroupNode::Make(name, repetition, children);
+  return Status::OK();
+}
+
+Status FieldToNode(const std::shared_ptr<Field>& field,
+    const WriterProperties& properties, NodePtr* out) {
+  LogicalType::type logical_type = LogicalType::NONE;
+  ParquetType::type type;
+  Repetition::type repetition = Repetition::REQUIRED;
+  if (field->nullable) { repetition = Repetition::OPTIONAL; }
+  int length = -1;
+
+  switch (field->type->type) {
+    // TODO:
+    // case ArrowType::NA:
+    // break;
+    case ArrowType::BOOL:
+      type = ParquetType::BOOLEAN;
+      break;
+    case ArrowType::UINT8:
+      type = ParquetType::INT32;
+      logical_type = LogicalType::UINT_8;
+      break;
+    case ArrowType::INT8:
+      type = ParquetType::INT32;
+      logical_type = LogicalType::INT_8;
+      break;
+    case ArrowType::UINT16:
+      type = ParquetType::INT32;
+      logical_type = LogicalType::UINT_16;
+      break;
+    case ArrowType::INT16:
+      type = ParquetType::INT32;
+      logical_type = LogicalType::INT_16;
+      break;
+    case ArrowType::UINT32:
+      if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) {
+        type = ParquetType::INT64;
+      } else {
+        type = ParquetType::INT32;
+        logical_type = LogicalType::UINT_32;
+      }
+      break;
+    case ArrowType::INT32:
+      type = ParquetType::INT32;
+      break;
+    case ArrowType::UINT64:
+      type = ParquetType::INT64;
+      logical_type = LogicalType::UINT_64;
+      break;
+    case ArrowType::INT64:
+      type = ParquetType::INT64;
+      break;
+    case ArrowType::FLOAT:
+      type = ParquetType::FLOAT;
+      break;
+    case ArrowType::DOUBLE:
+      type = ParquetType::DOUBLE;
+      break;
+    case ArrowType::STRING:
+      type = ParquetType::BYTE_ARRAY;
+      logical_type = LogicalType::UTF8;
+      break;
+    case ArrowType::BINARY:
+      type = ParquetType::BYTE_ARRAY;
+      break;
+    case ArrowType::DATE:
+      type = ParquetType::INT32;
+      logical_type = LogicalType::DATE;
+      break;
+    case ArrowType::TIMESTAMP: {
+      auto timestamp_type = static_cast<::arrow::TimestampType*>(field->type.get());
+      if (timestamp_type->unit != ::arrow::TimestampType::Unit::MILLI) {
+        return Status::NotImplemented(
+            "Other timestamp units than millisecond are not yet support with parquet.");
+      }
+      type = ParquetType::INT64;
+      logical_type = LogicalType::TIMESTAMP_MILLIS;
+    } break;
+    case ArrowType::TIMESTAMP_DOUBLE:
+      type = ParquetType::INT64;
+      // This is specified as seconds since the UNIX epoch
+      // TODO: Converted type in Parquet?
+      // logical_type = LogicalType::TIMESTAMP_MILLIS;
+      break;
+    case ArrowType::TIME:
+      type = ParquetType::INT64;
+      logical_type = LogicalType::TIME_MILLIS;
+      break;
+    case ArrowType::STRUCT: {
+      auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type);
+      return StructToNode(struct_type, field->name, field->nullable, properties, out);
+    } break;
+    default:
+      // TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
+      return Status::NotImplemented("unhandled type");
+  }
+  *out = PrimitiveNode::Make(field->name, repetition, type, logical_type, length);
+  return Status::OK();
+}
+
+Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
+    const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out) {
+  std::vector<NodePtr> nodes(arrow_schema->num_fields());
+  for (int i = 0; i < arrow_schema->num_fields(); i++) {
+    RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i]));
+  }
+
+  NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
+  *out = std::make_shared<::parquet::SchemaDescriptor>();
+  PARQUET_CATCH_NOT_OK((*out)->Init(schema));
+
+  return Status::OK();
+}
+
+}  // namespace arrow
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/schema.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
new file mode 100644
index 0000000..6917b90
--- /dev/null
+++ b/src/parquet/arrow/schema.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_ARROW_SCHEMA_H
+#define PARQUET_ARROW_SCHEMA_H
+
+#include <memory>
+
+#include "arrow/schema.h"
+#include "arrow/type.h"
+#include "arrow/util/visibility.h"
+
+#include "parquet/api/schema.h"
+#include "parquet/api/writer.h"
+
+namespace arrow {
+
+class Status;
+
+}  // namespace arrow
+
+namespace parquet {
+
+namespace arrow {
+
+::arrow::Status PARQUET_EXPORT NodeToField(
+    const schema::NodePtr& node, std::shared_ptr<::arrow::Field>* out);
+
+::arrow::Status PARQUET_EXPORT FromParquetSchema(
+    const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out);
+
+::arrow::Status PARQUET_EXPORT FieldToNode(const std::shared_ptr<::arrow::Field>& field,
+    const WriterProperties& properties, schema::NodePtr* out);
+
+::arrow::Status PARQUET_EXPORT ToParquetSchema(const ::arrow::Schema* arrow_schema,
+    const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out);
+
+}  // namespace arrow
+
+}  // namespace parquet
+
+#endif  // PARQUET_ARROW_SCHEMA_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
new file mode 100644
index 0000000..deac9f7
--- /dev/null
+++ b/src/parquet/arrow/test-util.h
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <vector>
+
+#include "arrow/test-util.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+
+namespace parquet {
+
+namespace arrow {
+
+template <typename ArrowType>
+using is_arrow_float = std::is_floating_point<typename ArrowType::c_type>;
+
+template <typename ArrowType>
+using is_arrow_int = std::is_integral<typename ArrowType::c_type>;
+
+template <typename ArrowType>
+using is_arrow_string = std::is_same<ArrowType, ::arrow::StringType>;
+
+template <class ArrowType>
+typename std::enable_if<is_arrow_float<ArrowType>::value,
+    std::shared_ptr<::arrow::PrimitiveArray>>::type
+NonNullArray(size_t size) {
+  std::vector<typename ArrowType::c_type> values;
+  ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values);
+  ::arrow::NumericBuilder<ArrowType> builder(
+      ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+  builder.Append(values.data(), values.size());
+  return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish());
+}
+
+template <class ArrowType>
+typename std::enable_if<is_arrow_int<ArrowType>::value,
+    std::shared_ptr<::arrow::PrimitiveArray>>::type
+NonNullArray(size_t size) {
+  std::vector<typename ArrowType::c_type> values;
+  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  ::arrow::NumericBuilder<ArrowType> builder(
+      ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+  builder.Append(values.data(), values.size());
+  return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish());
+}
+
+template <class ArrowType>
+typename std::enable_if<is_arrow_string<ArrowType>::value,
+    std::shared_ptr<::arrow::StringArray>>::type
+NonNullArray(size_t size) {
+  ::arrow::StringBuilder builder(
+      ::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>());
+  for (size_t i = 0; i < size; i++) {
+    builder.Append("test-string");
+  }
+  return std::static_pointer_cast<::arrow::StringArray>(builder.Finish());
+}
+
+template <>
+std::shared_ptr<::arrow::PrimitiveArray> NonNullArray<::arrow::BooleanType>(size_t size) {
+  std::vector<uint8_t> values;
+  ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+  ::arrow::BooleanBuilder builder(
+      ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>());
+  builder.Append(values.data(), values.size());
+  return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls.
+template <typename ArrowType>
+typename std::enable_if<is_arrow_float<ArrowType>::value,
+    std::shared_ptr<::arrow::PrimitiveArray>>::type
+NullableArray(size_t size, size_t num_nulls) {
+  std::vector<typename ArrowType::c_type> values;
+  ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values);
+  std::vector<uint8_t> valid_bytes(size, 1);
+
+  for (size_t i = 0; i < num_nulls; i++) {
+    valid_bytes[i * 2] = 0;
+  }
+
+  ::arrow::NumericBuilder<ArrowType> builder(
+      ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+  builder.Append(values.data(), values.size(), valid_bytes.data());
+  return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls.
+template <typename ArrowType>
+typename std::enable_if<is_arrow_int<ArrowType>::value,
+    std::shared_ptr<::arrow::PrimitiveArray>>::type
+NullableArray(size_t size, size_t num_nulls) {
+  std::vector<typename ArrowType::c_type> values;
+  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  std::vector<uint8_t> valid_bytes(size, 1);
+
+  for (size_t i = 0; i < num_nulls; i++) {
+    valid_bytes[i * 2] = 0;
+  }
+
+  ::arrow::NumericBuilder<ArrowType> builder(
+      ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+  builder.Append(values.data(), values.size(), valid_bytes.data());
+  return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls yet.
+template <typename ArrowType>
+typename std::enable_if<is_arrow_string<ArrowType>::value,
+    std::shared_ptr<::arrow::StringArray>>::type
+NullableArray(size_t size, size_t num_nulls) {
+  std::vector<uint8_t> valid_bytes(size, 1);
+
+  for (size_t i = 0; i < num_nulls; i++) {
+    valid_bytes[i * 2] = 0;
+  }
+
+  ::arrow::StringBuilder builder(
+      ::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>());
+  for (size_t i = 0; i < size; i++) {
+    builder.Append("test-string");
+  }
+  return std::static_pointer_cast<::arrow::StringArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls yet.
+template <>
+std::shared_ptr<::arrow::PrimitiveArray> NullableArray<::arrow::BooleanType>(
+    size_t size, size_t num_nulls) {
+  std::vector<uint8_t> values;
+  ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+  std::vector<uint8_t> valid_bytes(size, 1);
+
+  for (size_t i = 0; i < num_nulls; i++) {
+    valid_bytes[i * 2] = 0;
+  }
+
+  ::arrow::BooleanBuilder builder(
+      ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>());
+  builder.Append(values.data(), values.size(), valid_bytes.data());
+  return std::static_pointer_cast<::arrow::PrimitiveArray>(builder.Finish());
+}
+
+std::shared_ptr<::arrow::Column> MakeColumn(const std::string& name,
+    const std::shared_ptr<::arrow::Array>& array, bool nullable) {
+  auto field = std::make_shared<::arrow::Field>(name, array->type(), nullable);
+  return std::make_shared<::arrow::Column>(field, array);
+}
+
+std::shared_ptr<::arrow::Table> MakeSimpleTable(
+    const std::shared_ptr<::arrow::Array>& values, bool nullable) {
+  std::shared_ptr<::arrow::Column> column = MakeColumn("col", values, nullable);
+  std::vector<std::shared_ptr<::arrow::Column>> columns({column});
+  std::vector<std::shared_ptr<::arrow::Field>> fields({column->field()});
+  auto schema = std::make_shared<::arrow::Schema>(fields);
+  return std::make_shared<::arrow::Table>("table", schema, columns);
+}
+
+template <typename T>
+void ExpectArray(T* expected, ::arrow::Array* result) {
+  auto p_array = static_cast<::arrow::PrimitiveArray*>(result);
+  for (int i = 0; i < result->length(); i++) {
+    EXPECT_EQ(expected[i], reinterpret_cast<const T*>(p_array->data()->data())[i]);
+  }
+}
+
+template <typename ArrowType>
+void ExpectArray(typename ArrowType::c_type* expected, ::arrow::Array* result) {
+  ::arrow::PrimitiveArray* p_array = static_cast<::arrow::PrimitiveArray*>(result);
+  for (int64_t i = 0; i < result->length(); i++) {
+    EXPECT_EQ(expected[i],
+        reinterpret_cast<const typename ArrowType::c_type*>(p_array->data()->data())[i]);
+  }
+}
+
+template <>
+void ExpectArray<::arrow::BooleanType>(uint8_t* expected, ::arrow::Array* result) {
+  ::arrow::BooleanBuilder builder(
+      ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>());
+  builder.Append(expected, result->length());
+  std::shared_ptr<::arrow::Array> expected_array = builder.Finish();
+  EXPECT_TRUE(result->Equals(expected_array));
+}
+
+}  // namespace arrow
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/utils.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/utils.h b/src/parquet/arrow/utils.h
new file mode 100644
index 0000000..b443c99
--- /dev/null
+++ b/src/parquet/arrow/utils.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_ARROW_UTILS_H
+#define PARQUET_ARROW_UTILS_H
+
+#include <sstream>
+
+#include "arrow/util/status.h"
+#include "parquet/exception.h"
+
+namespace parquet {
+namespace arrow {
+
+#define PARQUET_CATCH_NOT_OK(s)                    \
+  try {                                            \
+    (s);                                           \
+  } catch (const ::parquet::ParquetException& e) { \
+    return ::arrow::Status::Invalid(e.what());     \
+  }
+
+#define PARQUET_IGNORE_NOT_OK(s) \
+  try {                          \
+    (s);                         \
+  } catch (const ::parquet::ParquetException& e) {}
+
+#define PARQUET_THROW_NOT_OK(s)                    \
+  do {                                             \
+    ::arrow::Status _s = (s);                      \
+    if (!_s.ok()) {                                \
+      std::stringstream ss;                        \
+      ss << "Arrow error: " << _s.ToString();      \
+      throw ::parquet::ParquetException(ss.str()); \
+    }                                              \
+  } while (0);
+
+}  // namespace arrow
+}  // namespace parquet
+
+#endif  // PARQUET_ARROW_UTILS_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
new file mode 100644
index 0000000..5b5f41f
--- /dev/null
+++ b/src/parquet/arrow/writer.cc
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/writer.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "parquet/arrow/schema.h"
+#include "parquet/arrow/utils.h"
+
+#include "arrow/array.h"
+#include "arrow/column.h"
+#include "arrow/table.h"
+#include "arrow/types/construct.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/util/status.h"
+
+using arrow::MemoryPool;
+using arrow::PoolBuffer;
+using arrow::PrimitiveArray;
+using arrow::Status;
+using arrow::StringArray;
+using arrow::Table;
+
+using parquet::ParquetFileWriter;
+using parquet::ParquetVersion;
+using parquet::schema::GroupNode;
+
+namespace parquet {
+namespace arrow {
+
+class FileWriter::Impl {
+ public:
+  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
+
+  Status NewRowGroup(int64_t chunk_size);
+  template <typename ParquetType, typename ArrowType>
+  Status TypedWriteBatch(ColumnWriter* writer, const PrimitiveArray* data,
+      int64_t offset, int64_t length);
+
+  // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary
+  // buffer
+  template <typename InType, typename OutType>
+  struct can_copy_ptr {
+    static constexpr bool value =
+        std::is_same<InType, OutType>::value ||
+        (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
+            (sizeof(InType) == sizeof(OutType)));
+  };
+
+  template <typename InType, typename OutType,
+      typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+  Status ConvertPhysicalType(const InType* in_ptr, int64_t, const OutType** out_ptr) {
+    *out_ptr = reinterpret_cast<const OutType*>(in_ptr);
+    return Status::OK();
+  }
+
+  template <typename InType, typename OutType,
+      typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+  Status ConvertPhysicalType(
+      const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+    RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(OutType)));
+    OutType* mutable_out_ptr = reinterpret_cast<OutType*>(data_buffer_.mutable_data());
+    std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
+    *out_ptr = mutable_out_ptr;
+    return Status::OK();
+  }
+
+  Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length);
+  Status WriteFlatColumnChunk(const StringArray* data, int64_t offset, int64_t length);
+  Status Close();
+
+  virtual ~Impl() {}
+
+ private:
+  friend class FileWriter;
+
+  MemoryPool* pool_;
+  // Buffer used for storing the data of an array converted to the physical type
+  // as expected by parquet-cpp.
+  PoolBuffer data_buffer_;
+  PoolBuffer def_levels_buffer_;
+  std::unique_ptr<ParquetFileWriter> writer_;
+  RowGroupWriter* row_group_writer_;
+};
+
+FileWriter::Impl::Impl(
+    MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer)
+    : pool_(pool),
+      data_buffer_(pool),
+      writer_(std::move(writer)),
+      row_group_writer_(nullptr) {}
+
+Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
+  if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
+  PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup(chunk_size));
+  return Status::OK();
+}
+
+template <typename ParquetType, typename ArrowType>
+Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
+    const PrimitiveArray* data, int64_t offset, int64_t length) {
+  using ArrowCType = typename ArrowType::c_type;
+  using ParquetCType = typename ParquetType::c_type;
+
+  DCHECK((offset + length) <= data->length());
+  auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data()) + offset;
+  auto writer =
+      reinterpret_cast<TypedColumnWriter<ParquetType>*>(column_writer);
+  if (writer->descr()->max_definition_level() == 0) {
+    // no nulls, just dump the data
+    const ParquetCType* data_writer_ptr = nullptr;
+    RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
+        data_ptr, length, &data_writer_ptr)));
+    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_writer_ptr));
+  } else if (writer->descr()->max_definition_level() == 1) {
+    RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
+    int16_t* def_levels_ptr =
+        reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+    if (data->null_count() == 0) {
+      std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+      const ParquetCType* data_writer_ptr = nullptr;
+      RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
+          data_ptr, length, &data_writer_ptr)));
+      PARQUET_CATCH_NOT_OK(
+          writer->WriteBatch(length, def_levels_ptr, nullptr, data_writer_ptr));
+    } else {
+      RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType)));
+      auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
+      int buffer_idx = 0;
+      for (int i = 0; i < length; i++) {
+        if (data->IsNull(offset + i)) {
+          def_levels_ptr[i] = 0;
+        } else {
+          def_levels_ptr[i] = 1;
+          buffer_ptr[buffer_idx++] = static_cast<ParquetCType>(data_ptr[i]);
+        }
+      }
+      PARQUET_CATCH_NOT_OK(
+          writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+    }
+  } else {
+    return Status::NotImplemented("no support for max definition level > 1 yet");
+  }
+  PARQUET_CATCH_NOT_OK(writer->Close());
+  return Status::OK();
+}
+
+// This specialization seems quite similar but it significantly differs in two points:
+// * offset is added at the most latest time to the pointer as we have sub-byte access
+// * Arrow data is stored bitwise thus we cannot use std::copy to transform from
+//   ArrowType::c_type to ParquetType::c_type
+template <>
+Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
+    ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset,
+    int64_t length) {
+  DCHECK((offset + length) <= data->length());
+  RETURN_NOT_OK(data_buffer_.Resize(length));
+  auto data_ptr = reinterpret_cast<const uint8_t*>(data->data()->data());
+  auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data());
+  auto writer = reinterpret_cast<TypedColumnWriter<BooleanType>*>(
+      column_writer);
+  if (writer->descr()->max_definition_level() == 0) {
+    // no nulls, just dump the data
+    for (int64_t i = 0; i < length; i++) {
+      buffer_ptr[i] = ::arrow::util::get_bit(data_ptr, offset + i);
+    }
+    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, buffer_ptr));
+  } else if (writer->descr()->max_definition_level() == 1) {
+    RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
+    int16_t* def_levels_ptr =
+        reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+    if (data->null_count() == 0) {
+      std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+      for (int64_t i = 0; i < length; i++) {
+        buffer_ptr[i] = ::arrow::util::get_bit(data_ptr, offset + i);
+      }
+      // TODO(PARQUET-644): write boolean values as a packed bitmap
+      PARQUET_CATCH_NOT_OK(
+          writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+    } else {
+      int buffer_idx = 0;
+      for (int i = 0; i < length; i++) {
+        if (data->IsNull(offset + i)) {
+          def_levels_ptr[i] = 0;
+        } else {
+          def_levels_ptr[i] = 1;
+          buffer_ptr[buffer_idx++] = ::arrow::util::get_bit(data_ptr, offset + i);
+        }
+      }
+      PARQUET_CATCH_NOT_OK(
+          writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+    }
+  } else {
+    return Status::NotImplemented("no support for max definition level > 1 yet");
+  }
+  PARQUET_CATCH_NOT_OK(writer->Close());
+  return Status::OK();
+}
+
+Status FileWriter::Impl::Close() {
+  if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
+  PARQUET_CATCH_NOT_OK(writer_->Close());
+  return Status::OK();
+}
+
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType)                            \
+  case ::arrow::Type::ENUM:                                                       \
+    return TypedWriteBatch<ParquetType, ArrowType>(writer, data, offset, length); \
+    break;
+
+Status FileWriter::Impl::WriteFlatColumnChunk(
+    const PrimitiveArray* data, int64_t offset, int64_t length) {
+  ColumnWriter* writer;
+  PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn());
+  switch (data->type_enum()) {
+    TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
+    TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
+    TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
+    TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
+    TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
+    case ::arrow::Type::UINT32:
+      if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
+        // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
+        // to use the larger Int64Type to store them lossless.
+        return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(
+            writer, data, offset, length);
+      } else {
+        return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(
+            writer, data, offset, length);
+      }
+      TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
+      TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
+      TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
+      TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type)
+      TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
+      TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
+    default:
+      return Status::NotImplemented(data->type()->ToString());
+  }
+}
+
+Status FileWriter::Impl::WriteFlatColumnChunk(
+    const StringArray* data, int64_t offset, int64_t length) {
+  ColumnWriter* column_writer;
+  PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
+  DCHECK((offset + length) <= data->length());
+  RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ByteArray)));
+  auto buffer_ptr = reinterpret_cast<ByteArray*>(data_buffer_.mutable_data());
+  auto values = std::dynamic_pointer_cast<PrimitiveArray>(data->values());
+  auto data_ptr = reinterpret_cast<const uint8_t*>(values->data()->data());
+  DCHECK(values != nullptr);
+  auto writer = reinterpret_cast<TypedColumnWriter<ByteArrayType>*>(
+      column_writer);
+  if (writer->descr()->max_definition_level() > 0) {
+    RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
+  }
+  int16_t* def_levels_ptr = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+  if (writer->descr()->max_definition_level() == 0 || data->null_count() == 0) {
+    // no nulls, just dump the data
+    for (int64_t i = 0; i < length; i++) {
+      buffer_ptr[i] = ByteArray(
+          data->value_length(i + offset), data_ptr + data->value_offset(i));
+    }
+    if (writer->descr()->max_definition_level() > 0) {
+      std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+    }
+    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+  } else if (writer->descr()->max_definition_level() == 1) {
+    int buffer_idx = 0;
+    for (int64_t i = 0; i < length; i++) {
+      if (data->IsNull(offset + i)) {
+        def_levels_ptr[i] = 0;
+      } else {
+        def_levels_ptr[i] = 1;
+        buffer_ptr[buffer_idx++] = ByteArray(
+            data->value_length(i + offset), data_ptr + data->value_offset(i + offset));
+      }
+    }
+    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+  } else {
+    return Status::NotImplemented("no support for max definition level > 1 yet");
+  }
+  PARQUET_CATCH_NOT_OK(writer->Close());
+  return Status::OK();
+}
+
+FileWriter::FileWriter(
+    MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer)
+    : impl_(new FileWriter::Impl(pool, std::move(writer))) {}
+
+Status FileWriter::NewRowGroup(int64_t chunk_size) {
+  return impl_->NewRowGroup(chunk_size);
+}
+
+Status FileWriter::WriteFlatColumnChunk(
+    const ::arrow::Array* array, int64_t offset, int64_t length) {
+  int64_t real_length = length;
+  if (length == -1) { real_length = array->length(); }
+  if (array->type_enum() == ::arrow::Type::STRING) {
+    auto string_array = dynamic_cast<const ::arrow::StringArray*>(array);
+    DCHECK(string_array);
+    return impl_->WriteFlatColumnChunk(string_array, offset, real_length);
+  } else {
+    auto primitive_array = dynamic_cast<const PrimitiveArray*>(array);
+    if (!primitive_array) {
+      return Status::NotImplemented("Table must consist of PrimitiveArray instances");
+    }
+    return impl_->WriteFlatColumnChunk(primitive_array, offset, real_length);
+  }
+}
+
+Status FileWriter::Close() {
+  return impl_->Close();
+}
+
+MemoryPool* FileWriter::memory_pool() const {
+  return impl_->pool_;
+}
+
+FileWriter::~FileWriter() {}
+
+Status WriteFlatTable(const Table* table, MemoryPool* pool,
+    const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
+    const std::shared_ptr<WriterProperties>& properties) {
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  RETURN_NOT_OK(
+      ToParquetSchema(table->schema().get(), *properties.get(), &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+  std::unique_ptr<ParquetFileWriter> parquet_writer =
+      ParquetFileWriter::Open(sink, schema_node, properties);
+  FileWriter writer(pool, std::move(parquet_writer));
+
+  // TODO(ARROW-232) Support writing chunked arrays.
+  for (int i = 0; i < table->num_columns(); i++) {
+    if (table->column(i)->data()->num_chunks() != 1) {
+      return Status::NotImplemented("No support for writing chunked arrays yet.");
+    }
+  }
+
+  for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) {
+    int64_t offset = chunk * chunk_size;
+    int64_t size = std::min(chunk_size, table->num_rows() - offset);
+    RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close()));
+    for (int i = 0; i < table->num_columns(); i++) {
+      std::shared_ptr<::arrow::Array> array = table->column(i)->data()->chunk(0);
+      RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(array.get(), offset, size),
+          PARQUET_IGNORE_NOT_OK(writer.Close()));
+    }
+  }
+
+  return writer.Close();
+}
+
+}  // namespace arrow
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
new file mode 100644
index 0000000..92524d8
--- /dev/null
+++ b/src/parquet/arrow/writer.h
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_ARROW_WRITER_H
+#define PARQUET_ARROW_WRITER_H
+
+#include <memory>
+
+#include "parquet/api/schema.h"
+#include "parquet/api/writer.h"
+
+namespace arrow {
+
+class Array;
+class MemoryPool;
+class PrimitiveArray;
+class RowBatch;
+class Status;
+class StringArray;
+class Table;
+}
+
+namespace parquet {
+
+namespace arrow {
+
+/**
+ * Iterative API:
+ *  Start a new RowGroup/Chunk with NewRowGroup
+ *  Write column-by-column the whole column chunk
+ */
+class PARQUET_EXPORT FileWriter {
+ public:
+  FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
+
+  ::arrow::Status NewRowGroup(int64_t chunk_size);
+  ::arrow::Status WriteFlatColumnChunk(
+      const ::arrow::Array* data, int64_t offset = 0, int64_t length = -1);
+  ::arrow::Status Close();
+
+  virtual ~FileWriter();
+
+  ::arrow::MemoryPool* memory_pool() const;
+
+ private:
+  class PARQUET_NO_EXPORT Impl;
+  std::unique_ptr<Impl> impl_;
+};
+
+/**
+ * Write a flat Table to Parquet.
+ *
+ * The table shall only consist of nullable, non-repeated columns of primitive type.
+ */
+::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table,
+    ::arrow::MemoryPool* pool, const std::shared_ptr<OutputStream>& sink,
+    int64_t chunk_size,
+    const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
+
+}  // namespace arrow
+
+}  // namespace parquet
+
+#endif  // PARQUET_ARROW_WRITER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/thirdparty/build_thirdparty.sh
----------------------------------------------------------------------
diff --git a/thirdparty/build_thirdparty.sh b/thirdparty/build_thirdparty.sh
index dca586c..8229484 100755
--- a/thirdparty/build_thirdparty.sh
+++ b/thirdparty/build_thirdparty.sh
@@ -15,6 +15,7 @@ else
   # Allow passing specific libs to build on the command line
   for arg in "$*"; do
     case $arg in
+      "arrow")      F_ARROW=1 ;;
       "zlib")       F_ZLIB=1 ;;
       "gbenchmark") F_GBENCHMARK=1 ;;
       "gtest")      F_GTEST=1 ;;
@@ -57,6 +58,15 @@ fi
 
 STANDARD_DARWIN_FLAGS="-std=c++11 -stdlib=libc++"
 
+# build arrow
+if [ -n "$F_ALL" -o -n "$F_ARROW" ]; then
+    cd $TP_DIR/$ARROW_BASEDIR/cpp
+    source ./setup_build_env.sh
+    cmake . -DARROW_PARQUET=OFF -DARROW_HDFS=ON -DCMAKE_INSTALL_PREFIX=$PREFIX
+    make -j$PARALLEL install
+    # :
+fi
+
 # build googletest
 GOOGLETEST_ERROR="failed for googletest!"
 if [ -n "$F_ALL" -o -n "$F_GTEST" ]; then

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/thirdparty/download_thirdparty.sh
----------------------------------------------------------------------
diff --git a/thirdparty/download_thirdparty.sh b/thirdparty/download_thirdparty.sh
index fae6c9c..23bdb96 100755
--- a/thirdparty/download_thirdparty.sh
+++ b/thirdparty/download_thirdparty.sh
@@ -20,6 +20,11 @@ download_extract_and_cleanup() {
     rm $filename
 }
 
+if [ ! -d ${ARROW_BASEDIR} ]; then
+  echo "Fetching arrow"
+  download_extract_and_cleanup $ARROW_URL
+fi
+
 if [ ! -d ${SNAPPY_BASEDIR} ]; then
   echo "Fetching snappy"
   download_extract_and_cleanup $SNAPPY_URL

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/thirdparty/set_thirdparty_env.sh
----------------------------------------------------------------------
diff --git a/thirdparty/set_thirdparty_env.sh b/thirdparty/set_thirdparty_env.sh
index 80715ef..3733d08 100644
--- a/thirdparty/set_thirdparty_env.sh
+++ b/thirdparty/set_thirdparty_env.sh
@@ -7,6 +7,7 @@ if [ -z "$THIRDPARTY_DIR" ]; then
 	THIRDPARTY_DIR=$SOURCE_DIR
 fi
 
+export ARROW_HOME=$THIRDPARTY_DIR/installed
 export SNAPPY_HOME=$THIRDPARTY_DIR/installed
 export ZLIB_HOME=$THIRDPARTY_DIR/installed
 # build script doesn't support building thrift on OSX

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/thirdparty/versions.sh
----------------------------------------------------------------------
diff --git a/thirdparty/versions.sh b/thirdparty/versions.sh
index b56262a..05f5cc2 100755
--- a/thirdparty/versions.sh
+++ b/thirdparty/versions.sh
@@ -1,3 +1,7 @@
+ARROW_VERSION="6b8abb4402ff1f39fc5944a7df6e3b4755691d87"
+ARROW_URL="https://github.com/apache/arrow/archive/${ARROW_VERSION}.tar.gz"
+ARROW_BASEDIR="arrow-${ARROW_VERSION}"
+
 SNAPPY_VERSION=1.1.3
 SNAPPY_URL="https://github.com/google/snappy/releases/download/${SNAPPY_VERSION}/snappy-${SNAPPY_VERSION}.tar.gz"
 SNAPPY_BASEDIR=snappy-$SNAPPY_VERSION


[2/2] parquet-cpp git commit: PARQUET-712: Add library to read into Arrow memory

Posted by we...@apache.org.
PARQUET-712: Add library to read into Arrow memory

At the moment this is just move of the existing code into the state where it compiles. Outstanding work includes:

- [x] Understand the issues with ParquetException typeid matching in the tests *on macOS*. @wesm We already had this problem and you fixed it somewhere. Do you remember the solution?
- [x] Understand why BoolenType tests break with a Thrift exception
- [ ] Add functions that read directly into Arrow memory and not intermediate structures.

Author: Uwe L. Korn <uw...@xhochy.com>
Author: Korn, Uwe <Uw...@blue-yonder.com>

Closes #158 from xhochy/PARQUET-712 and squashes the following commits:

e55ab1f [Uwe L. Korn] verbose ctest output
62f0f88 [Uwe L. Korn] Add static linkage
fc2c316 [Uwe L. Korn] Style fixes
3f3e24b [Uwe L. Korn] Fix templating problem
45de044 [Uwe L. Korn] Style fixes for IO
1d39a60 [Uwe L. Korn] Import MemoryPool instead of declaring it
251262a [Uwe L. Korn] Style fixes
e0e1518 [Uwe L. Korn] Build parquet_arrow in Travis
874b33d [Korn, Uwe] Add boost libraries for Arrow
142a364 [Korn, Uwe] PARQUET-712: Add library to read into Arrow memory


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/4a7bf117
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/4a7bf117
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/4a7bf117

Branch: refs/heads/master
Commit: 4a7bf1174419db6b2e4fc1847d866c4763b7be44
Parents: 8ef68b1
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Sun Sep 18 14:01:41 2016 -0400
Committer: Wes McKinney <we...@apache.org>
Committed: Sun Sep 18 14:01:41 2016 -0400

----------------------------------------------------------------------
 .travis.yml                                   |   6 +-
 CMakeLists.txt                                |  35 ++
 ci/travis_script_cpp.sh                       |   4 +-
 cmake_modules/FindArrow.cmake                 |  87 ++++
 conda.recipe/build.sh                         |   1 +
 src/parquet/arrow/CMakeLists.txt              |  90 ++++
 src/parquet/arrow/arrow-io-test.cc            | 189 ++++++++
 src/parquet/arrow/arrow-reader-writer-test.cc | 502 +++++++++++++++++++++
 src/parquet/arrow/arrow-schema-test.cc        | 265 +++++++++++
 src/parquet/arrow/io.cc                       | 107 +++++
 src/parquet/arrow/io.h                        |  82 ++++
 src/parquet/arrow/reader.cc                   | 406 +++++++++++++++++
 src/parquet/arrow/reader.h                    | 148 ++++++
 src/parquet/arrow/schema.cc                   | 351 ++++++++++++++
 src/parquet/arrow/schema.h                    |  56 +++
 src/parquet/arrow/test-util.h                 | 202 +++++++++
 src/parquet/arrow/utils.h                     |  54 +++
 src/parquet/arrow/writer.cc                   | 373 +++++++++++++++
 src/parquet/arrow/writer.h                    |  78 ++++
 thirdparty/build_thirdparty.sh                |  10 +
 thirdparty/download_thirdparty.sh             |   5 +
 thirdparty/set_thirdparty_env.sh              |   1 +
 thirdparty/versions.sh                        |   4 +
 23 files changed, 3052 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 780d9f9..6dc994e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -9,6 +9,8 @@ addons:
     - g++-4.9
     - valgrind
     - libboost-dev
+    - libboost-filesystem-dev
+    - libboost-system-dev
     - libboost-program-options-dev
     - libboost-test-dev
     - libssl-dev
@@ -26,7 +28,7 @@ matrix:
     before_script:
     - source $TRAVIS_BUILD_DIR/ci/before_script_travis.sh
     - cmake -DCMAKE_CXX_FLAGS="-Werror" -DPARQUET_TEST_MEMCHECK=ON -DPARQUET_BUILD_BENCHMARKS=ON
-      -DPARQUET_GENERATE_COVERAGE=1 $TRAVIS_BUILD_DIR
+      -DPARQUET_ARROW=ON -DPARQUET_GENERATE_COVERAGE=1 $TRAVIS_BUILD_DIR
     - export PARQUET_TEST_DATA=$TRAVIS_BUILD_DIR/data
   - compiler: clang
     os: linux
@@ -76,7 +78,7 @@ before_install:
 
 before_script:
 - source $TRAVIS_BUILD_DIR/ci/before_script_travis.sh
-- cmake -DCMAKE_CXX_FLAGS="-Werror" $TRAVIS_BUILD_DIR
+- cmake -DCMAKE_CXX_FLAGS="-Werror" -DPARQUET_ARROW=ON $TRAVIS_BUILD_DIR
 - export PARQUET_TEST_DATA=$TRAVIS_BUILD_DIR/data
 
 script:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3878056..42b10ee 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -82,6 +82,9 @@ if ("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
   option(PARQUET_BUILD_EXECUTABLES
     "Build the libparquet executable CLI tools"
     ON)
+  option(PARQUET_ARROW
+    "Build the Arrow support"
+    OFF)
 endif()
 
 # If build in-source, create the latest symlink. If build out-of-source, which is
@@ -247,6 +250,16 @@ function(ADD_PARQUET_TEST_DEPENDENCIES REL_TEST_NAME)
   add_dependencies(${TEST_NAME} ${ARGN})
 endfunction()
 
+# A wrapper for add_dependencies() that is compatible with PARQUET_BUILD_TESTS.
+function(ADD_PARQUET_LINK_LIBRARIES REL_TEST_NAME)
+  if(NOT PARQUET_BUILD_TESTS)
+    return()
+  endif()
+  get_filename_component(TEST_NAME ${REL_TEST_NAME} NAME_WE)
+
+  target_link_libraries(${TEST_NAME} ${ARGN})
+endfunction()
+
 enable_testing()
 
 ############################################################
@@ -553,6 +566,12 @@ if (PARQUET_BUILD_SHARED)
     target_link_libraries(parquet_shared
       LINK_PUBLIC ${LIBPARQUET_LINK_LIBS}
       LINK_PRIVATE ${LIBPARQUET_PRIVATE_LINK_LIBS})
+    if (APPLE)
+      set_target_properties(parquet_shared
+        PROPERTIES
+        BUILD_WITH_INSTALL_RPATH ON
+        INSTALL_NAME_DIR "@rpath")
+    endif()
 endif()
 
 if (PARQUET_BUILD_STATIC)
@@ -583,6 +602,22 @@ add_dependencies(parquet_objlib parquet_thrift)
 add_subdirectory(benchmarks)
 add_subdirectory(tools)
 
+# Arrow
+if (PARQUET_ARROW)
+  find_package(Arrow REQUIRED)
+  include_directories(SYSTEM ${ARROW_INCLUDE_DIR})
+  add_library(arrow SHARED IMPORTED)
+  set_target_properties(arrow PROPERTIES IMPORTED_LOCATION ${ARROW_SHARED_LIB})
+  add_library(arrow_io SHARED IMPORTED)
+  set_target_properties(arrow_io PROPERTIES IMPORTED_LOCATION ${ARROW_IO_SHARED_LIB})
+  add_library(arrow_static STATIC IMPORTED)
+  set_target_properties(arrow_static PROPERTIES IMPORTED_LOCATION ${ARROW_STATIC_LIB})
+  add_library(arrow_io_static STATIC IMPORTED)
+  set_target_properties(arrow_io_static PROPERTIES IMPORTED_LOCATION ${ARROW_IO_STATIC_LIB})
+
+  add_subdirectory(src/parquet/arrow)
+endif()
+
 add_custom_target(clean-all
    COMMAND ${CMAKE_BUILD_TOOL} clean
    COMMAND ${CMAKE_COMMAND} -P cmake_modules/clean-all.cmake

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/ci/travis_script_cpp.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_cpp.sh b/ci/travis_script_cpp.sh
index 2e7dcdd..8794559 100755
--- a/ci/travis_script_cpp.sh
+++ b/ci/travis_script_cpp.sh
@@ -16,13 +16,13 @@ make lint
 
 if [ $TRAVIS_OS_NAME == "linux" ]; then
   make -j4 || exit 1
-  ctest -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
+  ctest -VV -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
   sudo pip install cpp_coveralls
   export PARQUET_ROOT=$TRAVIS_BUILD_DIR
   $TRAVIS_BUILD_DIR/ci/upload_coverage.sh
 else
   make -j4 || exit 1
-  ctest -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
+  ctest -VV -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
 fi
 
 popd

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/cmake_modules/FindArrow.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindArrow.cmake b/cmake_modules/FindArrow.cmake
new file mode 100644
index 0000000..91d0e71
--- /dev/null
+++ b/cmake_modules/FindArrow.cmake
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# - Find ARROW (arrow/api.h, libarrow.a, libarrow.so)
+# This module defines
+#  ARROW_INCLUDE_DIR, directory containing headers
+#  ARROW_LIBS, directory containing arrow libraries
+#  ARROW_STATIC_LIB, path to libarrow.a
+#  ARROW_SHARED_LIB, path to libarrow's shared library
+#  ARROW_FOUND, whether arrow has been found
+
+set(ARROW_SEARCH_HEADER_PATHS
+  $ENV{ARROW_HOME}/include
+)
+
+set(ARROW_SEARCH_LIB_PATH
+  $ENV{ARROW_HOME}/lib
+)
+
+find_path(ARROW_INCLUDE_DIR arrow/array.h PATHS
+  ${ARROW_SEARCH_HEADER_PATHS}
+  # make sure we don't accidentally pick up a different version
+  NO_DEFAULT_PATH
+)
+
+find_library(ARROW_LIB_PATH NAMES arrow
+  PATHS
+  ${ARROW_SEARCH_LIB_PATH}
+  NO_DEFAULT_PATH)
+
+find_library(ARROW_IO_LIB_PATH NAMES arrow_io
+  PATHS
+  ${ARROW_SEARCH_LIB_PATH}
+  NO_DEFAULT_PATH)
+
+if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH)
+  set(ARROW_FOUND TRUE)
+  set(ARROW_LIB_NAME libarrow)
+  set(ARROW_IO_LIB_NAME libarrow_io)
+
+  set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH})
+  set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a)
+  set(ARROW_SHARED_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+
+  set(ARROW_IO_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IO_LIB_NAME}.a)
+  set(ARROW_IO_SHARED_LIB ${ARROW_LIBS}/${ARROW_IO_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+  if (NOT Arrow_FIND_QUIETLY)
+    message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}")
+    message(STATUS "Found the Arrow IO library: ${ARROW_IO_LIB_PATH}")
+  endif ()
+else ()
+  if (NOT Arrow_FIND_QUIETLY)
+    set(ARROW_ERR_MSG "Could not find the Arrow library. Looked for headers")
+    set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_HEADER_PATHS}, and for libs")
+    set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_LIB_PATH}")
+    if (Arrow_FIND_REQUIRED)
+      message(FATAL_ERROR "${ARROW_ERR_MSG}")
+    else (Arrow_FIND_REQUIRED)
+      message(STATUS "${ARROW_ERR_MSG}")
+    endif (Arrow_FIND_REQUIRED)
+  endif ()
+  set(ARROW_FOUND FALSE)
+endif ()
+
+mark_as_advanced(
+  ARROW_FOUND
+  ARROW_INCLUDE_DIR
+  ARROW_LIBS
+  ARROW_STATIC_LIB
+  ARROW_SHARED_LIB
+  ARROW_IO_STATIC_LIB
+  ARROW_IO_SHARED_LIB
+)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/conda.recipe/build.sh
----------------------------------------------------------------------
diff --git a/conda.recipe/build.sh b/conda.recipe/build.sh
index 7fe8f91..f77475c 100644
--- a/conda.recipe/build.sh
+++ b/conda.recipe/build.sh
@@ -53,6 +53,7 @@ cmake \
     -DCMAKE_BUILD_TYPE=debug \
     -DCMAKE_INSTALL_PREFIX=$PREFIX \
     -DPARQUET_BUILD_BENCHMARKS=off \
+    -DPARQUET_ARROW=ON \
     ..
 
 make

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/CMakeLists.txt b/src/parquet/arrow/CMakeLists.txt
new file mode 100644
index 0000000..5d923a7
--- /dev/null
+++ b/src/parquet/arrow/CMakeLists.txt
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# ----------------------------------------------------------------------
+# parquet_arrow : Arrow <-> Parquet adapter
+
+set(PARQUET_ARROW_SRCS
+  io.cc
+  reader.cc
+  schema.cc
+  writer.cc
+)
+
+add_library(parquet_arrow_objlib OBJECT
+  ${PARQUET_ARROW_SRCS}
+)
+
+# SET_TARGET_PROPERTIES(parquet_arrow PROPERTIES LINKER_LANGUAGE CXX)
+
+if (PARQUET_BUILD_SHARED)
+    add_library(parquet_arrow_shared SHARED $<TARGET_OBJECTS:parquet_arrow_objlib>)
+    set_target_properties(parquet_arrow_shared
+      PROPERTIES
+      LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}"
+      LINK_FLAGS "${SHARED_LINK_FLAGS}"
+      OUTPUT_NAME "parquet_arrow")
+    target_link_libraries(parquet_arrow_shared
+      arrow
+      arrow_io
+      parquet_shared)
+    if (APPLE)
+      set_target_properties(parquet_arrow_shared
+        PROPERTIES
+        BUILD_WITH_INSTALL_RPATH ON
+        INSTALL_NAME_DIR "@rpath")
+    endif()
+endif()
+
+if (PARQUET_BUILD_STATIC)
+    add_library(parquet_arrow_static STATIC $<TARGET_OBJECTS:parquet_arrow_objlib>)
+    set_target_properties(parquet_arrow_static
+      PROPERTIES
+      LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}"
+      OUTPUT_NAME "parquet_arrow")
+  target_link_libraries(parquet_arrow_static
+      arrow_static
+      arrow_static
+      parquet_static)
+  install(TARGETS parquet_arrow_static
+      ARCHIVE DESTINATION lib
+      LIBRARY DESTINATION lib)
+endif()
+
+ADD_PARQUET_TEST(arrow-schema-test)
+ADD_PARQUET_TEST(arrow-io-test)
+ADD_PARQUET_TEST(arrow-reader-writer-test)
+
+if (PARQUET_BUILD_STATIC)
+  ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_static)
+  ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_static)
+  ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_static)
+else()
+  ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_shared)
+  ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_shared)
+  ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_shared)
+endif()
+
+# Headers: top level
+install(FILES
+  io.h
+  reader.h
+  schema.h
+  utils.h
+  writer.h
+  DESTINATION include/parquet/arrow)
+

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/arrow-io-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-io-test.cc b/src/parquet/arrow/arrow-io-test.cc
new file mode 100644
index 0000000..377fb97
--- /dev/null
+++ b/src/parquet/arrow/arrow-io-test.cc
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+#include <string>
+
+#include "gtest/gtest.h"
+
+#include "arrow/test-util.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+#include "parquet/api/io.h"
+#include "parquet/arrow/io.h"
+
+using arrow::default_memory_pool;
+using arrow::MemoryPool;
+using arrow::Status;
+
+// To assist with readability
+using ArrowROFile = arrow::io::RandomAccessFile;
+
+namespace parquet {
+namespace arrow {
+
+// Allocator tests
+
+TEST(TestParquetAllocator, DefaultCtor) {
+  ParquetAllocator allocator;
+
+  const int buffer_size = 10;
+
+  uint8_t* buffer = nullptr;
+  ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size););
+
+  // valgrind will complain if we write into nullptr
+  memset(buffer, 0, buffer_size);
+
+  allocator.Free(buffer, buffer_size);
+}
+
+// Pass through to the default memory pool
+class TrackingPool : public MemoryPool {
+ public:
+  TrackingPool() : pool_(default_memory_pool()), bytes_allocated_(0) {}
+
+  Status Allocate(int64_t size, uint8_t** out) override {
+    RETURN_NOT_OK(pool_->Allocate(size, out));
+    bytes_allocated_ += size;
+    return Status::OK();
+  }
+
+  void Free(uint8_t* buffer, int64_t size) override {
+    pool_->Free(buffer, size);
+    bytes_allocated_ -= size;
+  }
+
+  int64_t bytes_allocated() const override { return bytes_allocated_; }
+
+ private:
+  MemoryPool* pool_;
+  int64_t bytes_allocated_;
+};
+
+TEST(TestParquetAllocator, CustomPool) {
+  TrackingPool pool;
+
+  ParquetAllocator allocator(&pool);
+
+  ASSERT_EQ(&pool, allocator.pool());
+
+  const int buffer_size = 10;
+
+  uint8_t* buffer = nullptr;
+  ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size););
+
+  ASSERT_EQ(buffer_size, pool.bytes_allocated());
+
+  // valgrind will complain if we write into nullptr
+  memset(buffer, 0, buffer_size);
+
+  allocator.Free(buffer, buffer_size);
+
+  ASSERT_EQ(0, pool.bytes_allocated());
+}
+
+// ----------------------------------------------------------------------
+// Read source tests
+
+class BufferReader : public ArrowROFile {
+ public:
+  BufferReader(const uint8_t* buffer, int buffer_size)
+      : buffer_(buffer), buffer_size_(buffer_size), position_(0) {}
+
+  Status Close() override {
+    // no-op
+    return Status::OK();
+  }
+
+  Status Tell(int64_t* position) override {
+    *position = position_;
+    return Status::OK();
+  }
+
+  Status ReadAt(
+      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override {
+    RETURN_NOT_OK(Seek(position));
+    return Read(nbytes, bytes_read, buffer);
+  }
+
+  Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override {
+    memcpy(buffer, buffer_ + position_, nbytes);
+    *bytes_read = std::min(nbytes, buffer_size_ - position_);
+    position_ += *bytes_read;
+    return Status::OK();
+  }
+
+  Status GetSize(int64_t* size) override {
+    *size = buffer_size_;
+    return Status::OK();
+  }
+
+  Status Seek(int64_t position) override {
+    if (position < 0 || position >= buffer_size_) {
+      return Status::IOError("position out of bounds");
+    }
+
+    position_ = position;
+    return Status::OK();
+  }
+
+ private:
+  const uint8_t* buffer_;
+  int buffer_size_;
+  int64_t position_;
+};
+
+TEST(TestParquetReadSource, Basics) {
+  std::string data = "this is the data";
+  auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str());
+
+  ParquetAllocator allocator(default_memory_pool());
+
+  auto file = std::make_shared<BufferReader>(data_buffer, data.size());
+  auto source = std::make_shared<ParquetReadSource>(&allocator);
+
+  ASSERT_OK(source->Open(file));
+
+  ASSERT_EQ(0, source->Tell());
+  ASSERT_NO_THROW(source->Seek(5));
+  ASSERT_EQ(5, source->Tell());
+  ASSERT_NO_THROW(source->Seek(0));
+
+  // Seek out of bounds
+  ASSERT_THROW(source->Seek(100), ParquetException);
+
+  uint8_t buffer[50];
+
+  ASSERT_NO_THROW(source->Read(4, buffer));
+  ASSERT_EQ(0, std::memcmp(buffer, "this", 4));
+  ASSERT_EQ(4, source->Tell());
+
+  std::shared_ptr<Buffer> pq_buffer;
+
+  ASSERT_NO_THROW(pq_buffer = source->Read(7));
+
+  auto expected_buffer = std::make_shared<Buffer>(data_buffer + 4, 7);
+
+  ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get()));
+}
+
+}  // namespace arrow
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
new file mode 100644
index 0000000..e4e9efa
--- /dev/null
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -0,0 +1,502 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include "parquet/api/reader.h"
+#include "parquet/api/writer.h"
+
+#include "parquet/arrow/test-util.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+#include "arrow/test-util.h"
+#include "arrow/types/construct.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+using arrow::Array;
+using arrow::ChunkedArray;
+using arrow::default_memory_pool;
+using arrow::PoolBuffer;
+using arrow::PrimitiveArray;
+using arrow::Status;
+using arrow::Table;
+
+using ParquetBuffer = parquet::Buffer;
+using ParquetType = parquet::Type;
+using parquet::schema::GroupNode;
+using parquet::schema::NodePtr;
+using parquet::schema::PrimitiveNode;
+
+namespace parquet {
+
+namespace arrow {
+
+const int SMALL_SIZE = 100;
+const int LARGE_SIZE = 10000;
+
+template <typename TestType>
+struct test_traits {};
+
+template <>
+struct test_traits<::arrow::BooleanType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static uint8_t const value;
+};
+
+const uint8_t test_traits<::arrow::BooleanType>::value(1);
+
+template <>
+struct test_traits<::arrow::UInt8Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_8;
+  static uint8_t const value;
+};
+
+const uint8_t test_traits<::arrow::UInt8Type>::value(64);
+
+template <>
+struct test_traits<::arrow::Int8Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::INT_8;
+  static int8_t const value;
+};
+
+const int8_t test_traits<::arrow::Int8Type>::value(-64);
+
+template <>
+struct test_traits<::arrow::UInt16Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_16;
+  static uint16_t const value;
+};
+
+const uint16_t test_traits<::arrow::UInt16Type>::value(1024);
+
+template <>
+struct test_traits<::arrow::Int16Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::INT_16;
+  static int16_t const value;
+};
+
+const int16_t test_traits<::arrow::Int16Type>::value(-1024);
+
+template <>
+struct test_traits<::arrow::UInt32Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_32;
+  static uint32_t const value;
+};
+
+const uint32_t test_traits<::arrow::UInt32Type>::value(1024);
+
+template <>
+struct test_traits<::arrow::Int32Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static int32_t const value;
+};
+
+const int32_t test_traits<::arrow::Int32Type>::value(-1024);
+
+template <>
+struct test_traits<::arrow::UInt64Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_64;
+  static uint64_t const value;
+};
+
+const uint64_t test_traits<::arrow::UInt64Type>::value(1024);
+
+template <>
+struct test_traits<::arrow::Int64Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static int64_t const value;
+};
+
+const int64_t test_traits<::arrow::Int64Type>::value(-1024);
+
+template <>
+struct test_traits<::arrow::TimestampType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+  static constexpr LogicalType::type logical_enum = LogicalType::TIMESTAMP_MILLIS;
+  static int64_t const value;
+};
+
+const int64_t test_traits<::arrow::TimestampType>::value(14695634030000);
+
+template <>
+struct test_traits<::arrow::FloatType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static float const value;
+};
+
+const float test_traits<::arrow::FloatType>::value(2.1f);
+
+template <>
+struct test_traits<::arrow::DoubleType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static double const value;
+};
+
+const double test_traits<::arrow::DoubleType>::value(4.2);
+
+template <>
+struct test_traits<::arrow::StringType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
+  static constexpr LogicalType::type logical_enum = LogicalType::UTF8;
+  static std::string const value;
+};
+
+const std::string test_traits<::arrow::StringType>::value("Test");
+
+template <typename T>
+using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
+
+template <typename T>
+using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
+
+template <typename TestType>
+class TestParquetIO : public ::testing::Test {
+ public:
+  virtual void SetUp() {}
+
+  std::shared_ptr<GroupNode> MakeSchema(Repetition::type repetition) {
+    auto pnode = PrimitiveNode::Make("column1", repetition,
+        test_traits<TestType>::parquet_enum, test_traits<TestType>::logical_enum);
+    NodePtr node_ =
+        GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
+    return std::static_pointer_cast<GroupNode>(node_);
+  }
+
+  std::unique_ptr<ParquetFileWriter> MakeWriter(
+      const std::shared_ptr<GroupNode>& schema) {
+    sink_ = std::make_shared<InMemoryOutputStream>();
+    return ParquetFileWriter::Open(sink_, schema);
+  }
+
+  std::unique_ptr<ParquetFileReader> ReaderFromSink() {
+    std::shared_ptr<ParquetBuffer> buffer = sink_->GetBuffer();
+    std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
+    return ParquetFileReader::Open(std::move(source));
+  }
+
+  void ReadSingleColumnFile(
+      std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Array>* out) {
+    FileReader reader(::arrow::default_memory_pool(), std::move(file_reader));
+    std::unique_ptr<FlatColumnReader> column_reader;
+    ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader));
+    ASSERT_NE(nullptr, column_reader.get());
+
+    ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out));
+    ASSERT_NE(nullptr, out->get());
+  }
+
+  void ReadAndCheckSingleColumnFile(::arrow::Array* values) {
+    std::shared_ptr<::arrow::Array> out;
+    ReadSingleColumnFile(ReaderFromSink(), &out);
+    ASSERT_TRUE(values->Equals(out));
+  }
+
+  void ReadTableFromFile(
+      std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Table>* out) {
+    FileReader reader(::arrow::default_memory_pool(), std::move(file_reader));
+    ASSERT_OK_NO_THROW(reader.ReadFlatTable(out));
+    ASSERT_NE(nullptr, out->get());
+  }
+
+  void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
+    std::shared_ptr<::arrow::Table> out;
+    ReadTableFromFile(ReaderFromSink(), &out);
+    ASSERT_EQ(1, out->num_columns());
+    ASSERT_EQ(values->length(), out->num_rows());
+
+    std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+    ASSERT_EQ(1, chunked_array->num_chunks());
+    ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+  }
+
+  template <typename ArrayType>
+  void WriteFlatColumn(const std::shared_ptr<GroupNode>& schema,
+      const std::shared_ptr<ArrayType>& values) {
+    FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema));
+    ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
+    ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get()));
+    ASSERT_OK_NO_THROW(writer.Close());
+  }
+
+  std::shared_ptr<InMemoryOutputStream> sink_;
+};
+
+// We habe separate tests for UInt32Type as this is currently the only type
+// where a roundtrip does not yield the identical Array structure.
+// There we write an UInt32 Array but receive an Int64 Array as result for
+// Parquet version 1.0.
+
+typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
+    ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
+    ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType,
+    ::arrow::StringType> TestTypes;
+
+TYPED_TEST_CASE(TestParquetIO, TestTypes);
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
+  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+  this->WriteFlatColumn(schema, values);
+
+  this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
+  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
+      this->sink_, values->length(), default_writer_properties()));
+
+  std::shared_ptr<Table> out;
+  this->ReadTableFromFile(this->ReaderFromSink(), &out);
+  ASSERT_EQ(1, out->num_columns());
+  ASSERT_EQ(100, out->num_rows());
+
+  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+  ASSERT_EQ(1, chunked_array->num_chunks());
+  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
+  // This also tests max_definition_level = 1
+  auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+  this->WriteFlatColumn(schema, values);
+
+  this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<Array> values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
+      this->sink_, values->length(), default_writer_properties()));
+
+  this->ReadAndCheckSingleColumnTable(values);
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
+  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+  int64_t chunk_size = values->length() / 4;
+
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+  FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
+  for (int i = 0; i < 4; i++) {
+    ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+    ASSERT_OK_NO_THROW(
+        writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+  }
+  ASSERT_OK_NO_THROW(writer.Close());
+
+  this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
+  auto values = NonNullArray<TypeParam>(LARGE_SIZE);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_OK_NO_THROW(WriteFlatTable(
+      table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties()));
+
+  this->ReadAndCheckSingleColumnTable(values);
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
+  int64_t chunk_size = SMALL_SIZE / 4;
+  auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+  FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema));
+  for (int i = 0; i < 4; i++) {
+    ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+    ASSERT_OK_NO_THROW(
+        writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+  }
+  ASSERT_OK_NO_THROW(writer.Close());
+
+  this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
+  // This also tests max_definition_level = 1
+  auto values = NullableArray<TypeParam>(LARGE_SIZE, 100);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
+      this->sink_, 512, default_writer_properties()));
+
+  this->ReadAndCheckSingleColumnTable(values);
+}
+
+using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;
+
+TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<PrimitiveArray> values =
+      NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+  // Parquet 2.0 roundtrip should yield an uint32_t column again
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  std::shared_ptr<::parquet::WriterProperties> properties =
+      ::parquet::WriterProperties::Builder()
+          .version(ParquetVersion::PARQUET_2_0)
+          ->build();
+  ASSERT_OK_NO_THROW(
+      WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
+  this->ReadAndCheckSingleColumnTable(values);
+}
+
+TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<PrimitiveArray> values =
+      NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+  // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0
+  // reader that a column is unsigned.
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  std::shared_ptr<::parquet::WriterProperties> properties =
+      ::parquet::WriterProperties::Builder()
+          .version(ParquetVersion::PARQUET_1_0)
+          ->build();
+  ASSERT_OK_NO_THROW(WriteFlatTable(
+      table.get(), ::arrow::default_memory_pool(), this->sink_, 512, properties));
+
+  std::shared_ptr<Array> expected_values;
+  std::shared_ptr<PoolBuffer> int64_data =
+      std::make_shared<PoolBuffer>(::arrow::default_memory_pool());
+  {
+    ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
+    int64_t* int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
+    const uint32_t* uint32_data_ptr =
+        reinterpret_cast<const uint32_t*>(values->data()->data());
+    // std::copy might be faster but this is explicit on the casts)
+    for (int64_t i = 0; i < values->length(); i++) {
+      int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]);
+    }
+  }
+  ASSERT_OK(MakePrimitiveArray(std::make_shared<::arrow::Int64Type>(), values->length(),
+      int64_data, values->null_count(), values->null_bitmap(), &expected_values));
+  this->ReadAndCheckSingleColumnTable(expected_values);
+}
+
+template <typename T>
+using ParquetCDataType = typename ParquetDataType<T>::c_type;
+
+template <typename TestType>
+class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
+ public:
+  typedef typename TestType::c_type T;
+
+  void MakeTestFile(std::vector<T>& values, int num_chunks,
+      std::unique_ptr<ParquetFileReader>* file_reader) {
+    std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+    std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema);
+    size_t chunk_size = values.size() / num_chunks;
+    // Convert to Parquet's expected physical type
+    std::vector<uint8_t> values_buffer(
+        sizeof(ParquetCDataType<TestType>) * values.size());
+    auto values_parquet =
+        reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data());
+    std::copy(values.cbegin(), values.cend(), values_parquet);
+    for (int i = 0; i < num_chunks; i++) {
+      auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
+      auto column_writer =
+          static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
+      ParquetCDataType<TestType>* data = values_parquet + i * chunk_size;
+      column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
+      column_writer->Close();
+      row_group_writer->Close();
+    }
+    file_writer->Close();
+    *file_reader = this->ReaderFromSink();
+  }
+
+  void CheckSingleColumnRequiredTableRead(int num_chunks) {
+    std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+    std::unique_ptr<ParquetFileReader> file_reader;
+    ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader));
+
+    std::shared_ptr<Table> out;
+    this->ReadTableFromFile(std::move(file_reader), &out);
+    ASSERT_EQ(1, out->num_columns());
+    ASSERT_EQ(SMALL_SIZE, out->num_rows());
+
+    std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+    ASSERT_EQ(1, chunked_array->num_chunks());
+    ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get());
+  }
+
+  void CheckSingleColumnRequiredRead(int num_chunks) {
+    std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+    std::unique_ptr<ParquetFileReader> file_reader;
+    ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader));
+
+    std::shared_ptr<Array> out;
+    this->ReadSingleColumnFile(std::move(file_reader), &out);
+
+    ExpectArray<TestType>(values.data(), out.get());
+  }
+};
+
+typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
+    ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, ::arrow::Int32Type,
+    ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType,
+    ::arrow::DoubleType> PrimitiveTestTypes;
+
+TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes);
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) {
+  this->CheckSingleColumnRequiredRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) {
+  this->CheckSingleColumnRequiredTableRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) {
+  this->CheckSingleColumnRequiredRead(4);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
+  this->CheckSingleColumnRequiredTableRead(4);
+}
+
+}  // namespace arrow
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/arrow-schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
new file mode 100644
index 0000000..3dfaf14
--- /dev/null
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -0,0 +1,265 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/schema.h"
+
+#include "arrow/test-util.h"
+#include "arrow/type.h"
+#include "arrow/types/datetime.h"
+#include "arrow/types/decimal.h"
+#include "arrow/util/status.h"
+
+using arrow::Field;
+
+using ParquetType = parquet::Type;
+using parquet::LogicalType;
+using parquet::Repetition;
+using parquet::schema::NodePtr;
+using parquet::schema::GroupNode;
+using parquet::schema::PrimitiveNode;
+
+namespace parquet {
+
+namespace arrow {
+
+const auto BOOL = std::make_shared<::arrow::BooleanType>();
+const auto UINT8 = std::make_shared<::arrow::UInt8Type>();
+const auto INT32 = std::make_shared<::arrow::Int32Type>();
+const auto INT64 = std::make_shared<::arrow::Int64Type>();
+const auto FLOAT = std::make_shared<::arrow::FloatType>();
+const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
+const auto UTF8 = std::make_shared<::arrow::StringType>();
+const auto TIMESTAMP_MS =
+    std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
+// TODO: This requires parquet-cpp implementing the MICROS enum value
+// const auto TIMESTAMP_US = std::make_shared<TimestampType>(TimestampType::Unit::MICRO);
+const auto BINARY =
+    std::make_shared<::arrow::ListType>(std::make_shared<Field>("", UINT8));
+const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4);
+
+class TestConvertParquetSchema : public ::testing::Test {
+ public:
+  virtual void SetUp() {}
+
+  void CheckFlatSchema(const std::shared_ptr<::arrow::Schema>& expected_schema) {
+    ASSERT_EQ(expected_schema->num_fields(), result_schema_->num_fields());
+    for (int i = 0; i < expected_schema->num_fields(); ++i) {
+      auto lhs = result_schema_->field(i);
+      auto rhs = expected_schema->field(i);
+      EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
+                                    << " != " << rhs->ToString();
+    }
+  }
+
+  ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes) {
+    NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
+    descr_.Init(schema);
+    return FromParquetSchema(&descr_, &result_schema_);
+  }
+
+ protected:
+  SchemaDescriptor descr_;
+  std::shared_ptr<::arrow::Schema> result_schema_;
+};
+
+TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN));
+  arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
+  arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
+  arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
+
+  parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
+      ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
+  arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
+
+  // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
+  //     ParquetType::INT64, LogicalType::TIMESTAMP_MICROS));
+  // arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, false));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
+  arrow_fields.push_back(std::make_shared<Field>("float", FLOAT));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
+  arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY));
+  arrow_fields.push_back(std::make_shared<Field>("binary", BINARY));
+
+  parquet_fields.push_back(PrimitiveNode::Make(
+      "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8));
+  arrow_fields.push_back(std::make_shared<Field>("string", UTF8));
+
+  parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL,
+      ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, 12));
+  arrow_fields.push_back(std::make_shared<Field>("flba-binary", BINARY));
+
+  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  ASSERT_OK(ConvertSchema(parquet_fields));
+
+  CheckFlatSchema(arrow_schema);
+}
+
+TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL,
+      ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 4, 8, 4));
+  arrow_fields.push_back(std::make_shared<Field>("flba-decimal", DECIMAL_8_4));
+
+  parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL,
+      ParquetType::BYTE_ARRAY, LogicalType::DECIMAL, -1, 8, 4));
+  arrow_fields.push_back(std::make_shared<Field>("binary-decimal", DECIMAL_8_4));
+
+  parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL,
+      ParquetType::INT32, LogicalType::DECIMAL, -1, 8, 4));
+  arrow_fields.push_back(std::make_shared<Field>("int32-decimal", DECIMAL_8_4));
+
+  parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL,
+      ParquetType::INT64, LogicalType::DECIMAL, -1, 8, 4));
+  arrow_fields.push_back(std::make_shared<Field>("int64-decimal", DECIMAL_8_4));
+
+  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  ASSERT_OK(ConvertSchema(parquet_fields));
+
+  CheckFlatSchema(arrow_schema);
+}
+
+TEST_F(TestConvertParquetSchema, UnsupportedThings) {
+  std::vector<NodePtr> unsupported_nodes;
+
+  unsupported_nodes.push_back(
+      PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96));
+
+  unsupported_nodes.push_back(
+      GroupNode::Make("repeated-group", Repetition::REPEATED, {}));
+
+  unsupported_nodes.push_back(PrimitiveNode::Make(
+      "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE));
+
+  for (const NodePtr& node : unsupported_nodes) {
+    ASSERT_RAISES(NotImplemented, ConvertSchema({node}));
+  }
+}
+
+class TestConvertArrowSchema : public ::testing::Test {
+ public:
+  virtual void SetUp() {}
+
+  void CheckFlatSchema(const std::vector<NodePtr>& nodes) {
+    NodePtr schema_node = GroupNode::Make("schema", Repetition::REPEATED, nodes);
+    const GroupNode* expected_schema_node =
+        static_cast<const GroupNode*>(schema_node.get());
+    const GroupNode* result_schema_node = result_schema_->group_node();
+
+    ASSERT_EQ(expected_schema_node->field_count(), result_schema_node->field_count());
+
+    for (int i = 0; i < expected_schema_node->field_count(); i++) {
+      auto lhs = result_schema_node->field(i);
+      auto rhs = expected_schema_node->field(i);
+      EXPECT_TRUE(lhs->Equals(rhs.get()));
+    }
+  }
+
+  ::arrow::Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) {
+    arrow_schema_ = std::make_shared<::arrow::Schema>(fields);
+    std::shared_ptr<::parquet::WriterProperties> properties =
+        ::parquet::default_writer_properties();
+    return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_);
+  }
+
+ protected:
+  std::shared_ptr<::arrow::Schema> arrow_schema_;
+  std::shared_ptr<SchemaDescriptor> result_schema_;
+};
+
+TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN));
+  arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
+  arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
+  arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
+
+  parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
+      ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
+  arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
+
+  // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
+  //     ParquetType::INT64, LogicalType::TIMESTAMP_MICROS));
+  // arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, false));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
+  arrow_fields.push_back(std::make_shared<Field>("float", FLOAT));
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
+  arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));
+
+  // TODO: String types need to be clarified a bit more in the Arrow spec
+  parquet_fields.push_back(PrimitiveNode::Make(
+      "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8));
+  arrow_fields.push_back(std::make_shared<Field>("string", UTF8));
+
+  ASSERT_OK(ConvertSchema(arrow_fields));
+
+  CheckFlatSchema(parquet_fields);
+}
+
+TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  // TODO: Test Decimal Arrow -> Parquet conversion
+
+  ASSERT_OK(ConvertSchema(arrow_fields));
+
+  CheckFlatSchema(parquet_fields);
+}
+
+TEST(TestNodeConversion, DateAndTime) {}
+
+}  // namespace arrow
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/io.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/io.cc b/src/parquet/arrow/io.cc
new file mode 100644
index 0000000..8e2645a
--- /dev/null
+++ b/src/parquet/arrow/io.cc
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/io.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/api/io.h"
+#include "parquet/arrow/utils.h"
+
+#include "arrow/util/status.h"
+
+using arrow::Status;
+using arrow::MemoryPool;
+
+// To assist with readability
+using ArrowROFile = arrow::io::RandomAccessFile;
+
+namespace parquet {
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// ParquetAllocator
+
+ParquetAllocator::ParquetAllocator() : pool_(::arrow::default_memory_pool()) {}
+
+ParquetAllocator::ParquetAllocator(MemoryPool* pool) : pool_(pool) {}
+
+ParquetAllocator::~ParquetAllocator() {}
+
+uint8_t* ParquetAllocator::Malloc(int64_t size) {
+  uint8_t* result;
+  PARQUET_THROW_NOT_OK(pool_->Allocate(size, &result));
+  return result;
+}
+
+void ParquetAllocator::Free(uint8_t* buffer, int64_t size) {
+  // Does not report Status
+  pool_->Free(buffer, size);
+}
+
+// ----------------------------------------------------------------------
+// ParquetReadSource
+
+ParquetReadSource::ParquetReadSource(ParquetAllocator* allocator)
+    : file_(nullptr), allocator_(allocator) {}
+
+Status ParquetReadSource::Open(const std::shared_ptr<ArrowROFile>& file) {
+  int64_t file_size;
+  RETURN_NOT_OK(file->GetSize(&file_size));
+
+  file_ = file;
+  size_ = file_size;
+  return Status::OK();
+}
+
+void ParquetReadSource::Close() {
+  // TODO(wesm): Make this a no-op for now. This leaves Python wrappers for
+  // these classes in a borked state. Probably better to explicitly close.
+
+  // PARQUET_THROW_NOT_OK(file_->Close());
+}
+
+int64_t ParquetReadSource::Tell() const {
+  int64_t position;
+  PARQUET_THROW_NOT_OK(file_->Tell(&position));
+  return position;
+}
+
+void ParquetReadSource::Seek(int64_t position) {
+  PARQUET_THROW_NOT_OK(file_->Seek(position));
+}
+
+int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) {
+  int64_t bytes_read;
+  PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out));
+  return bytes_read;
+}
+
+std::shared_ptr<Buffer> ParquetReadSource::Read(int64_t nbytes) {
+  // TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests
+  // that there should be more code sharing amongst file-like sources
+  auto result = std::make_shared<OwnedMutableBuffer>(0, allocator_);
+  result->Resize(nbytes);
+
+  int64_t bytes_read = Read(nbytes, result->mutable_data());
+  if (bytes_read < nbytes) { result->Resize(bytes_read); }
+  return result;
+}
+
+}  // namespace arrow
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/io.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/io.h b/src/parquet/arrow/io.h
new file mode 100644
index 0000000..dc60635
--- /dev/null
+++ b/src/parquet/arrow/io.h
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Bridges Arrow's IO interfaces and Parquet-cpp's IO interfaces
+
+#ifndef PARQUET_ARROW_IO_H
+#define PARQUET_ARROW_IO_H
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/api/io.h"
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/memory-pool.h"
+
+namespace parquet {
+
+namespace arrow {
+
+// An implementation of the Parquet MemoryAllocator API that plugs into an
+// existing Arrow memory pool. This way we can direct all allocations to a
+// single place rather than tracking allocations in different locations (for
+// example: without utilizing parquet-cpp's default allocator)
+class PARQUET_EXPORT ParquetAllocator : public MemoryAllocator {
+ public:
+  // Uses the default memory pool
+  ParquetAllocator();
+
+  explicit ParquetAllocator(::arrow::MemoryPool* pool);
+  virtual ~ParquetAllocator();
+
+  uint8_t* Malloc(int64_t size) override;
+  void Free(uint8_t* buffer, int64_t size) override;
+
+  void set_pool(::arrow::MemoryPool* pool) { pool_ = pool; }
+
+  ::arrow::MemoryPool* pool() const { return pool_; }
+
+ private:
+  ::arrow::MemoryPool* pool_;
+};
+
+class PARQUET_EXPORT ParquetReadSource : public RandomAccessSource {
+ public:
+  explicit ParquetReadSource(ParquetAllocator* allocator);
+
+  // We need to ask for the file size on opening the file, and this can fail
+  ::arrow::Status Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& file);
+
+  void Close() override;
+  int64_t Tell() const override;
+  void Seek(int64_t pos) override;
+  int64_t Read(int64_t nbytes, uint8_t* out) override;
+  std::shared_ptr<Buffer> Read(int64_t nbytes) override;
+
+ private:
+  // An Arrow readable file of some kind
+  std::shared_ptr<::arrow::io::RandomAccessFile> file_;
+
+  // The allocator is required for creating managed buffers
+  ParquetAllocator* allocator_;
+};
+
+}  // namespace arrow
+}  // namespace parquet
+
+#endif  // PARQUET_ARROW_IO_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
new file mode 100644
index 0000000..056b5ab
--- /dev/null
+++ b/src/parquet/arrow/reader.cc
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/reader.h"
+
+#include <algorithm>
+#include <queue>
+#include <string>
+#include <vector>
+
+#include "parquet/arrow/io.h"
+#include "parquet/arrow/schema.h"
+#include "parquet/arrow/utils.h"
+
+#include "arrow/column.h"
+#include "arrow/schema.h"
+#include "arrow/table.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/util/status.h"
+
+using arrow::Array;
+using arrow::Column;
+using arrow::Field;
+using arrow::MemoryPool;
+using arrow::PoolBuffer;
+using arrow::Status;
+using arrow::Table;
+
+// Help reduce verbosity
+using ParquetRAS = parquet::RandomAccessSource;
+using ParquetReader = parquet::ParquetFileReader;
+
+namespace parquet {
+namespace arrow {
+
+template <typename ArrowType>
+struct ArrowTypeTraits {
+  typedef ::arrow::NumericBuilder<ArrowType> builder_type;
+};
+
+template <>
+struct ArrowTypeTraits<BooleanType> {
+  typedef ::arrow::BooleanBuilder builder_type;
+};
+
+template <typename ArrowType>
+using BuilderType = typename ArrowTypeTraits<ArrowType>::builder_type;
+
+class FileReader::Impl {
+ public:
+  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader);
+  virtual ~Impl() {}
+
+  bool CheckForFlatColumn(const ColumnDescriptor* descr);
+  Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
+  Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);
+  Status ReadFlatTable(std::shared_ptr<Table>* out);
+
+ private:
+  MemoryPool* pool_;
+  std::unique_ptr<ParquetFileReader> reader_;
+};
+
+class FlatColumnReader::Impl {
+ public:
+  Impl(MemoryPool* pool, const ColumnDescriptor* descr,
+      ParquetFileReader* reader, int column_index);
+  virtual ~Impl() {}
+
+  Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
+  template <typename ArrowType, typename ParquetType>
+  Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
+
+  template <typename ArrowType, typename ParquetType>
+  Status ReadNullableFlatBatch(const int16_t* def_levels,
+      typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
+      BuilderType<ArrowType>* builder);
+  template <typename ArrowType, typename ParquetType>
+  Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read,
+      BuilderType<ArrowType>* builder);
+
+ private:
+  void NextRowGroup();
+
+  template <typename InType, typename OutType>
+  struct can_copy_ptr {
+    static constexpr bool value =
+        std::is_same<InType, OutType>::value ||
+        (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
+            (sizeof(InType) == sizeof(OutType)));
+  };
+
+  template <typename InType, typename OutType,
+      typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+  Status ConvertPhysicalType(
+      const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+    *out_ptr = reinterpret_cast<const OutType*>(in_ptr);
+    return Status::OK();
+  }
+
+  template <typename InType, typename OutType,
+      typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+  Status ConvertPhysicalType(
+      const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+    RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType)));
+    OutType* mutable_out_ptr =
+        reinterpret_cast<OutType*>(values_builder_buffer_.mutable_data());
+    std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
+    *out_ptr = mutable_out_ptr;
+    return Status::OK();
+  }
+
+  MemoryPool* pool_;
+  const ColumnDescriptor* descr_;
+  ParquetFileReader* reader_;
+  int column_index_;
+  int next_row_group_;
+  std::shared_ptr<ColumnReader> column_reader_;
+  std::shared_ptr<Field> field_;
+
+  PoolBuffer values_buffer_;
+  PoolBuffer def_levels_buffer_;
+  PoolBuffer values_builder_buffer_;
+  PoolBuffer valid_bytes_buffer_;
+};
+
+FileReader::Impl::Impl(
+    MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+    : pool_(pool), reader_(std::move(reader)) {}
+
+bool FileReader::Impl::CheckForFlatColumn(const ColumnDescriptor* descr) {
+  if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) {
+    return false;
+  } else if ((descr->max_definition_level() == 1) &&
+             (descr->schema_node()->repetition() != Repetition::OPTIONAL)) {
+    return false;
+  }
+  return true;
+}
+
+Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
+  const SchemaDescriptor* schema = reader_->metadata()->schema();
+
+  if (!CheckForFlatColumn(schema->Column(i))) {
+    return Status::Invalid("The requested column is not flat");
+  }
+  std::unique_ptr<FlatColumnReader::Impl> impl(
+      new FlatColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i));
+  *out = std::unique_ptr<FlatColumnReader>(new FlatColumnReader(std::move(impl)));
+  return Status::OK();
+}
+
+Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
+  std::unique_ptr<FlatColumnReader> flat_column_reader;
+  RETURN_NOT_OK(GetFlatColumn(i, &flat_column_reader));
+  return flat_column_reader->NextBatch(reader_->metadata()->num_rows(), out);
+}
+
+Status FileReader::Impl::ReadFlatTable(std::shared_ptr<Table>* table) {
+  auto descr = reader_->metadata()->schema();
+
+  const std::string& name = descr->name();
+  std::shared_ptr<::arrow::Schema> schema;
+  RETURN_NOT_OK(FromParquetSchema(descr, &schema));
+
+  int num_columns = reader_->metadata()->num_columns();
+
+  std::vector<std::shared_ptr<Column>> columns(num_columns);
+  for (int i = 0; i < num_columns; i++) {
+    std::shared_ptr<Array> array;
+    RETURN_NOT_OK(ReadFlatColumn(i, &array));
+    columns[i] = std::make_shared<Column>(schema->field(i), array);
+  }
+
+  *table = std::make_shared<Table>(name, schema, columns);
+  return Status::OK();
+}
+
+FileReader::FileReader(
+    MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+    : impl_(new FileReader::Impl(pool, std::move(reader))) {}
+
+FileReader::~FileReader() {}
+
+// Static ctor
+Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
+    ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader) {
+  std::unique_ptr<ParquetReadSource> source(new ParquetReadSource(allocator));
+  RETURN_NOT_OK(source->Open(file));
+
+  // TODO(wesm): reader properties
+  std::unique_ptr<ParquetReader> pq_reader;
+  PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(std::move(source)));
+
+  // Use the same memory pool as the ParquetAllocator
+  reader->reset(new FileReader(allocator->pool(), std::move(pq_reader)));
+  return Status::OK();
+}
+
+Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
+  return impl_->GetFlatColumn(i, out);
+}
+
+Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
+  return impl_->ReadFlatColumn(i, out);
+}
+
+Status FileReader::ReadFlatTable(std::shared_ptr<Table>* out) {
+  return impl_->ReadFlatTable(out);
+}
+
+FlatColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr,
+    ParquetFileReader* reader, int column_index)
+    : pool_(pool),
+      descr_(descr),
+      reader_(reader),
+      column_index_(column_index),
+      next_row_group_(0),
+      values_buffer_(pool),
+      def_levels_buffer_(pool) {
+  NodeToField(descr_->schema_node(), &field_);
+  NextRowGroup();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::ReadNonNullableBatch(typename ParquetType::c_type* values,
+    int64_t values_read, BuilderType<ArrowType>* builder) {
+  using ArrowCType = typename ArrowType::c_type;
+  using ParquetCType = typename ParquetType::c_type;
+
+  DCHECK(builder);
+  const ArrowCType* values_ptr = nullptr;
+  RETURN_NOT_OK(
+      (ConvertPhysicalType<ParquetCType, ArrowCType>(values, values_read, &values_ptr)));
+  RETURN_NOT_OK(builder->Append(values_ptr, values_read));
+  return Status::OK();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
+    typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
+    BuilderType<ArrowType>* builder) {
+  using ArrowCType = typename ArrowType::c_type;
+
+  DCHECK(builder);
+  RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * sizeof(ArrowCType)));
+  RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
+  auto values_ptr = reinterpret_cast<ArrowCType*>(values_builder_buffer_.mutable_data());
+  uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
+  int values_idx = 0;
+  for (int64_t i = 0; i < levels_read; i++) {
+    if (def_levels[i] < descr_->max_definition_level()) {
+      valid_bytes[i] = 0;
+    } else {
+      valid_bytes[i] = 1;
+      values_ptr[i] = values[values_idx++];
+    }
+  }
+  RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes));
+  return Status::OK();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::TypedReadBatch(
+    int batch_size, std::shared_ptr<Array>* out) {
+  using ParquetCType = typename ParquetType::c_type;
+
+  int values_to_read = batch_size;
+  BuilderType<ArrowType> builder(pool_, field_->type);
+  while ((values_to_read > 0) && column_reader_) {
+    values_buffer_.Resize(values_to_read * sizeof(ParquetCType));
+    if (descr_->max_definition_level() > 0) {
+      def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
+    }
+    auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
+    int64_t values_read;
+    int64_t levels_read;
+    int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+    auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
+    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+                             values_to_read, def_levels, nullptr, values, &values_read));
+    values_to_read -= levels_read;
+    if (descr_->max_definition_level() == 0) {
+      RETURN_NOT_OK(
+          (ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read, &builder)));
+    } else {
+      // As per the defintion and checks for flat columns:
+      // descr_->max_definition_level() == 1
+      RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>(
+          def_levels, values, values_read, levels_read, &builder)));
+    }
+    if (!column_reader_->HasNext()) { NextRowGroup(); }
+  }
+  *out = builder.Finish();
+  return Status::OK();
+}
+
+template <>
+Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
+    int batch_size, std::shared_ptr<Array>* out) {
+  int values_to_read = batch_size;
+  ::arrow::StringBuilder builder(pool_, field_->type);
+  while ((values_to_read > 0) && column_reader_) {
+    values_buffer_.Resize(values_to_read * sizeof(ByteArray));
+    if (descr_->max_definition_level() > 0) {
+      def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
+    }
+    auto reader =
+        dynamic_cast<TypedColumnReader<ByteArrayType>*>(column_reader_.get());
+    int64_t values_read;
+    int64_t levels_read;
+    int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+    auto values = reinterpret_cast<ByteArray*>(values_buffer_.mutable_data());
+    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+                             values_to_read, def_levels, nullptr, values, &values_read));
+    values_to_read -= levels_read;
+    if (descr_->max_definition_level() == 0) {
+      for (int64_t i = 0; i < levels_read; i++) {
+        RETURN_NOT_OK(
+            builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len));
+      }
+    } else {
+      // descr_->max_definition_level() == 1
+      int values_idx = 0;
+      for (int64_t i = 0; i < levels_read; i++) {
+        if (def_levels[i] < descr_->max_definition_level()) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else {
+          RETURN_NOT_OK(
+              builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr),
+                  values[values_idx].len));
+          values_idx++;
+        }
+      }
+    }
+    if (!column_reader_->HasNext()) { NextRowGroup(); }
+  }
+  *out = builder.Finish();
+  return Status::OK();
+}
+
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType)              \
+  case ::arrow::Type::ENUM:                                         \
+    return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
+    break;
+
+Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+  if (!column_reader_) {
+    // Exhausted all row groups.
+    *out = nullptr;
+    return Status::OK();
+  }
+
+  switch (field_->type->type) {
+    TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
+    TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
+    TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
+    TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
+    TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
+    TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
+    TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
+    TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
+    TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
+    TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
+    TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
+    TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
+    TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type)
+    default:
+      return Status::NotImplemented(field_->type->ToString());
+  }
+}
+
+void FlatColumnReader::Impl::NextRowGroup() {
+  if (next_row_group_ < reader_->metadata()->num_row_groups()) {
+    column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_);
+    next_row_group_++;
+  } else {
+    column_reader_ = nullptr;
+  }
+}
+
+FlatColumnReader::FlatColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
+
+FlatColumnReader::~FlatColumnReader() {}
+
+Status FlatColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+  return impl_->NextBatch(batch_size, out);
+}
+
+}  // namespace arrow
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
new file mode 100644
index 0000000..e725728
--- /dev/null
+++ b/src/parquet/arrow/reader.h
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_ARROW_READER_H
+#define PARQUET_ARROW_READER_H
+
+#include <memory>
+
+#include "parquet/api/reader.h"
+#include "parquet/api/schema.h"
+#include "parquet/arrow/io.h"
+
+#include "arrow/io/interfaces.h"
+
+namespace arrow {
+
+class Array;
+class MemoryPool;
+class RowBatch;
+class Status;
+class Table;
+}
+
+namespace parquet {
+
+namespace arrow {
+
+class FlatColumnReader;
+
+// Arrow read adapter class for deserializing Parquet files as Arrow row
+// batches.
+//
+// TODO(wesm): nested data does not always make sense with this user
+// interface unless you are only reading a single leaf node from a branch of
+// a table. For example:
+//
+// repeated group data {
+//   optional group record {
+//     optional int32 val1;
+//     optional byte_array val2;
+//     optional bool val3;
+//   }
+//   optional int32 val4;
+// }
+//
+// In the Parquet file, there are 3 leaf nodes:
+//
+// * data.record.val1
+// * data.record.val2
+// * data.record.val3
+// * data.val4
+//
+// When materializing this data in an Arrow array, we would have:
+//
+// data: list<struct<
+//   record: struct<
+//    val1: int32,
+//    val2: string (= list<uint8>),
+//    val3: bool,
+//   >,
+//   val4: int32
+// >>
+//
+// However, in the Parquet format, each leaf node has its own repetition and
+// definition levels describing the structure of the intermediate nodes in
+// this array structure. Thus, we will need to scan the leaf data for a group
+// of leaf nodes part of the same type tree to create a single result Arrow
+// nested array structure.
+//
+// This is additionally complicated "chunky" repeated fields or very large byte
+// arrays
+class PARQUET_EXPORT FileReader {
+ public:
+  FileReader(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader);
+
+  // Since the distribution of columns amongst a Parquet file's row groups may
+  // be uneven (the number of values in each column chunk can be different), we
+  // provide a column-oriented read interface. The ColumnReader hides the
+  // details of paging through the file's row groups and yielding
+  // fully-materialized arrow::Array instances
+  //
+  // Returns error status if the column of interest is not flat.
+  ::arrow::Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
+  // Read column as a whole into an Array.
+  ::arrow::Status ReadFlatColumn(int i, std::shared_ptr<::arrow::Array>* out);
+  // Read a table of flat columns into a Table.
+  ::arrow::Status ReadFlatTable(std::shared_ptr<::arrow::Table>* out);
+
+  virtual ~FileReader();
+
+ private:
+  class PARQUET_NO_EXPORT Impl;
+  std::unique_ptr<Impl> impl_;
+};
+
+// At this point, the column reader is a stream iterator. It only knows how to
+// read the next batch of values for a particular column from the file until it
+// runs out.
+//
+// We also do not expose any internal Parquet details, such as row groups. This
+// might change in the future.
+class PARQUET_EXPORT FlatColumnReader {
+ public:
+  virtual ~FlatColumnReader();
+
+  // Scan the next array of the indicated size. The actual size of the
+  // returned array may be less than the passed size depending how much data is
+  // available in the file.
+  //
+  // When all the data in the file has been exhausted, the result is set to
+  // nullptr.
+  //
+  // Returns Status::OK on a successful read, including if you have exhausted
+  // the data available in the file.
+  ::arrow::Status NextBatch(int batch_size, std::shared_ptr<::arrow::Array>* out);
+
+ private:
+  class PARQUET_NO_EXPORT Impl;
+  std::unique_ptr<Impl> impl_;
+  explicit FlatColumnReader(std::unique_ptr<Impl> impl);
+
+  friend class FileReader;
+};
+
+// Helper function to create a file reader from an implementation of an Arrow
+// readable file
+PARQUET_EXPORT
+::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
+    ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader);
+
+}  // namespace arrow
+}  // namespace parquet
+
+#endif  // PARQUET_ARROW_READER_H