You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/06/29 18:57:15 UTC

arrow git commit: ARROW-215: Support other integer types and strings in Parquet I/O

Repository: arrow
Updated Branches:
  refs/heads/master ef9083029 -> 2f52cf4ee


ARROW-215: Support other integer types and strings in Parquet I/O

Change-Id: I72c6c82bc38c895a04172531bebbc78d4fb08732


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/2f52cf4e
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/2f52cf4e
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/2f52cf4e

Branch: refs/heads/master
Commit: 2f52cf4eed1033d1bf1f043d9063e462e60d6605
Parents: ef90830
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Sun Jun 12 11:48:10 2016 +0200
Committer: Uwe L. Korn <uw...@xhochy.com>
Committed: Tue Jun 28 21:32:32 2016 +0200

----------------------------------------------------------------------
 cpp/src/arrow/parquet/parquet-io-test.cc     | 461 ++++++++++++++--------
 cpp/src/arrow/parquet/parquet-schema-test.cc |   4 +-
 cpp/src/arrow/parquet/reader.cc              | 160 +++++++-
 cpp/src/arrow/parquet/schema.cc              |  47 ++-
 cpp/src/arrow/parquet/schema.h               |   9 +-
 cpp/src/arrow/parquet/test-util.h            | 136 ++++++-
 cpp/src/arrow/parquet/writer.cc              | 234 +++++++++--
 cpp/src/arrow/parquet/writer.h               |   9 +-
 cpp/src/arrow/test-util.h                    |   2 +
 cpp/src/arrow/types/primitive.cc             |   5 +
 python/pyarrow/includes/parquet.pxd          |  13 +-
 python/pyarrow/parquet.pyx                   |  22 +-
 python/pyarrow/tests/test_parquet.py         |  43 +-
 13 files changed, 901 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/parquet-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc
index edcac88..572cae1 100644
--- a/cpp/src/arrow/parquet/parquet-io-test.cc
+++ b/cpp/src/arrow/parquet/parquet-io-test.cc
@@ -21,7 +21,9 @@
 #include "arrow/parquet/test-util.h"
 #include "arrow/parquet/reader.h"
 #include "arrow/parquet/writer.h"
+#include "arrow/types/construct.h"
 #include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
 #include "arrow/util/memory-pool.h"
 #include "arrow/util/status.h"
 
@@ -30,12 +32,15 @@
 
 using ParquetBuffer = parquet::Buffer;
 using parquet::BufferReader;
+using parquet::default_writer_properties;
 using parquet::InMemoryOutputStream;
+using parquet::LogicalType;
 using parquet::ParquetFileReader;
 using parquet::ParquetFileWriter;
 using parquet::RandomAccessSource;
 using parquet::Repetition;
 using parquet::SchemaDescriptor;
+using parquet::ParquetVersion;
 using ParquetType = parquet::Type;
 using parquet::schema::GroupNode;
 using parquet::schema::NodePtr;
@@ -52,25 +57,113 @@ template <typename TestType>
 struct test_traits {};
 
 template <>
+struct test_traits<BooleanType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static uint8_t const value;
+};
+
+const uint8_t test_traits<BooleanType>::value(1);
+
+template <>
+struct test_traits<UInt8Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_8;
+  static uint8_t const value;
+};
+
+const uint8_t test_traits<UInt8Type>::value(64);
+
+template <>
+struct test_traits<Int8Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::INT_8;
+  static int8_t const value;
+};
+
+const int8_t test_traits<Int8Type>::value(-64);
+
+template <>
+struct test_traits<UInt16Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_16;
+  static uint16_t const value;
+};
+
+const uint16_t test_traits<UInt16Type>::value(1024);
+
+template <>
+struct test_traits<Int16Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::INT_16;
+  static int16_t const value;
+};
+
+const int16_t test_traits<Int16Type>::value(-1024);
+
+template <>
+struct test_traits<UInt32Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_32;
+  static uint32_t const value;
+};
+
+const uint32_t test_traits<UInt32Type>::value(1024);
+
+template <>
 struct test_traits<Int32Type> {
   static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static int32_t const value;
+};
+
+const int32_t test_traits<Int32Type>::value(-1024);
+
+template <>
+struct test_traits<UInt64Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_64;
+  static uint64_t const value;
 };
 
+const uint64_t test_traits<UInt64Type>::value(1024);
+
 template <>
 struct test_traits<Int64Type> {
   static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static int64_t const value;
 };
 
+const int64_t test_traits<Int64Type>::value(-1024);
+
 template <>
 struct test_traits<FloatType> {
   static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static float const value;
 };
 
+const float test_traits<FloatType>::value(2.1f);
+
 template <>
 struct test_traits<DoubleType> {
   static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static double const value;
+};
+
+const double test_traits<DoubleType>::value(4.2);
+
+template <>
+struct test_traits<StringType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
+  static constexpr LogicalType::type logical_enum = LogicalType::UTF8;
+  static std::string const value;
 };
 
+const std::string test_traits<StringType>::value("Test");
+
 template <typename T>
 using ParquetDataType = ::parquet::DataType<test_traits<T>::parquet_enum>;
 
@@ -80,18 +173,18 @@ using ParquetWriter = ::parquet::TypedColumnWriter<ParquetDataType<T>>;
 template <typename TestType>
 class TestParquetIO : public ::testing::Test {
  public:
-  typedef typename TestType::c_type T;
   virtual void SetUp() {}
 
-  std::shared_ptr<GroupNode> MakeSchema(
-      ParquetType::type parquet_type, Repetition::type repetition) {
-    auto pnode = PrimitiveNode::Make("column1", repetition, parquet_type);
+  std::shared_ptr<GroupNode> MakeSchema(Repetition::type repetition) {
+    auto pnode = PrimitiveNode::Make("column1", repetition,
+        test_traits<TestType>::parquet_enum, test_traits<TestType>::logical_enum);
     NodePtr node_ =
         GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
     return std::static_pointer_cast<GroupNode>(node_);
   }
 
-  std::unique_ptr<ParquetFileWriter> MakeWriter(std::shared_ptr<GroupNode>& schema) {
+  std::unique_ptr<ParquetFileWriter> MakeWriter(
+      const std::shared_ptr<GroupNode>& schema) {
     sink_ = std::make_shared<InMemoryOutputStream>();
     return ParquetFileWriter::Open(sink_, schema);
   }
@@ -106,113 +199,74 @@ class TestParquetIO : public ::testing::Test {
       std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Array>* out) {
     arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
     std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
-    ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader)));
+    ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader));
     ASSERT_NE(nullptr, column_reader.get());
+
     ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out));
     ASSERT_NE(nullptr, out->get());
   }
 
+  void ReadAndCheckSingleColumnFile(Array* values) {
+    std::shared_ptr<Array> out;
+    ReadSingleColumnFile(ReaderFromSink(), &out);
+    ASSERT_TRUE(values->Equals(out));
+  }
+
   void ReadTableFromFile(
       std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Table>* out) {
     arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
-    ASSERT_NO_THROW(ASSERT_OK(reader.ReadFlatTable(out)));
+    ASSERT_OK_NO_THROW(reader.ReadFlatTable(out));
     ASSERT_NE(nullptr, out->get());
   }
 
-  std::unique_ptr<ParquetFileReader> TestFile(std::vector<T>& values, int num_chunks) {
-    std::shared_ptr<GroupNode> schema =
-        MakeSchema(test_traits<TestType>::parquet_enum, Repetition::REQUIRED);
-    std::unique_ptr<ParquetFileWriter> file_writer = MakeWriter(schema);
-    size_t chunk_size = values.size() / num_chunks;
-    for (int i = 0; i < num_chunks; i++) {
-      auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
-      auto column_writer =
-          static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
-      T* data = values.data() + i * chunk_size;
-      column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
-      column_writer->Close();
-      row_group_writer->Close();
-    }
-    file_writer->Close();
-    return ReaderFromSink();
+  void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) {
+    std::shared_ptr<Table> out;
+    ReadTableFromFile(ReaderFromSink(), &out);
+    ASSERT_EQ(1, out->num_columns());
+    ASSERT_EQ(values->length(), out->num_rows());
+
+    std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+    ASSERT_EQ(1, chunked_array->num_chunks());
+    ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+  }
+
+  template <typename ArrayType>
+  void WriteFlatColumn(const std::shared_ptr<GroupNode>& schema,
+      const std::shared_ptr<ArrayType>& values) {
+    FileWriter writer(default_memory_pool(), MakeWriter(schema));
+    ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
+    ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get()));
+    ASSERT_OK_NO_THROW(writer.Close());
   }
 
   std::shared_ptr<InMemoryOutputStream> sink_;
 };
 
-typedef ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType> TestTypes;
-
-TYPED_TEST_CASE(TestParquetIO, TestTypes);
-
-TYPED_TEST(TestParquetIO, SingleColumnRequiredRead) {
-  std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
-  std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 1);
-
-  std::shared_ptr<Array> out;
-  this->ReadSingleColumnFile(std::move(file_reader), &out);
-
-  ExpectArray<typename TypeParam::c_type>(values.data(), out.get());
-}
-
-TYPED_TEST(TestParquetIO, SingleColumnRequiredTableRead) {
-  std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
-  std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 1);
-
-  std::shared_ptr<Table> out;
-  this->ReadTableFromFile(std::move(file_reader), &out);
-  ASSERT_EQ(1, out->num_columns());
-  ASSERT_EQ(SMALL_SIZE, out->num_rows());
-
-  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
-  ASSERT_EQ(1, chunked_array->num_chunks());
-  ExpectArray<typename TypeParam::c_type>(values.data(), chunked_array->chunk(0).get());
-}
-
-TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedRead) {
-  std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
-  std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 4);
-
-  std::shared_ptr<Array> out;
-  this->ReadSingleColumnFile(std::move(file_reader), &out);
+// We habe separate tests for UInt32Type as this is currently the only type
+// where a roundtrip does not yield the identical Array structure.
+// There we write an UInt32 Array but receive an Int64 Array as result for
+// Parquet version 1.0.
 
-  ExpectArray<typename TypeParam::c_type>(values.data(), out.get());
-}
-
-TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedTableRead) {
-  std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
-  std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 4);
-
-  std::shared_ptr<Table> out;
-  this->ReadTableFromFile(std::move(file_reader), &out);
-  ASSERT_EQ(1, out->num_columns());
-  ASSERT_EQ(SMALL_SIZE, out->num_rows());
+typedef ::testing::Types<BooleanType, UInt8Type, Int8Type, UInt16Type, Int16Type,
+    Int32Type, UInt64Type, Int64Type, FloatType, DoubleType, StringType> TestTypes;
 
-  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
-  ASSERT_EQ(1, chunked_array->num_chunks());
-  ExpectArray<typename TypeParam::c_type>(values.data(), chunked_array->chunk(0).get());
-}
+TYPED_TEST_CASE(TestParquetIO, TestTypes);
 
 TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
-  std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128);
+  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
 
-  std::shared_ptr<GroupNode> schema =
-      this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::REQUIRED);
-  FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
-  ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length())));
-  ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get())));
-  ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+  this->WriteFlatColumn(schema, values);
 
-  std::shared_ptr<Array> out;
-  this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
-  ASSERT_TRUE(values->Equals(out));
+  this->ReadAndCheckSingleColumnFile(values.get());
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
-  std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128);
+  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
   std::shared_ptr<Table> table = MakeSimpleTable(values, false);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_NO_THROW(ASSERT_OK(
-      WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length())));
+  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_,
+      values->length(), default_writer_properties()));
 
   std::shared_ptr<Table> out;
   this->ReadTableFromFile(this->ReaderFromSink(), &out);
@@ -226,113 +280,208 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
 
 TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
   // This also tests max_definition_level = 1
-  std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10);
+  auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
 
-  std::shared_ptr<GroupNode> schema =
-      this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::OPTIONAL);
-  FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
-  ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length())));
-  ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get())));
-  ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+  this->WriteFlatColumn(schema, values);
 
-  std::shared_ptr<Array> out;
-  this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
-  ASSERT_TRUE(values->Equals(out));
+  this->ReadAndCheckSingleColumnFile(values.get());
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
   // This also tests max_definition_level = 1
-  std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10);
+  std::shared_ptr<Array> values = NullableArray<TypeParam>(SMALL_SIZE, 10);
   std::shared_ptr<Table> table = MakeSimpleTable(values, true);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_NO_THROW(ASSERT_OK(
-      WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length())));
-
-  std::shared_ptr<Table> out;
-  this->ReadTableFromFile(this->ReaderFromSink(), &out);
-  ASSERT_EQ(1, out->num_columns());
-  ASSERT_EQ(SMALL_SIZE, out->num_rows());
+  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_,
+      values->length(), default_writer_properties()));
 
-  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
-  ASSERT_EQ(1, chunked_array->num_chunks());
-  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+  this->ReadAndCheckSingleColumnTable(values);
 }
 
-TYPED_TEST(TestParquetIO, SingleColumnIntRequiredChunkedWrite) {
-  std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128);
-  std::shared_ptr<PrimitiveArray> values_chunk =
-      NonNullArray<TypeParam>(SMALL_SIZE / 4, 128);
+TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
+  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+  int64_t chunk_size = values->length() / 4;
 
-  std::shared_ptr<GroupNode> schema =
-      this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::REQUIRED);
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
   FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
   for (int i = 0; i < 4; i++) {
-    ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length())));
-    ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get())));
+    ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+    ASSERT_OK_NO_THROW(
+        writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
   }
-  ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+  ASSERT_OK_NO_THROW(writer.Close());
 
-  std::shared_ptr<Array> out;
-  this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
-  ASSERT_TRUE(values->Equals(out));
+  this->ReadAndCheckSingleColumnFile(values.get());
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
-  std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(LARGE_SIZE, 128);
+  auto values = NonNullArray<TypeParam>(LARGE_SIZE);
   std::shared_ptr<Table> table = MakeSimpleTable(values, false);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_NO_THROW(
-      ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512)));
-
-  std::shared_ptr<Table> out;
-  this->ReadTableFromFile(this->ReaderFromSink(), &out);
-  ASSERT_EQ(1, out->num_columns());
-  ASSERT_EQ(LARGE_SIZE, out->num_rows());
+  ASSERT_OK_NO_THROW(WriteFlatTable(
+      table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties()));
 
-  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
-  ASSERT_EQ(1, chunked_array->num_chunks());
-  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+  this->ReadAndCheckSingleColumnTable(values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
-  std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10);
-  std::shared_ptr<PrimitiveArray> values_chunk_nulls =
-      NullableArray<TypeParam>(SMALL_SIZE / 4, 128, 10);
-  std::shared_ptr<PrimitiveArray> values_chunk =
-      NullableArray<TypeParam>(SMALL_SIZE / 4, 128, 0);
-
-  std::shared_ptr<GroupNode> schema =
-      this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::OPTIONAL);
+  int64_t chunk_size = SMALL_SIZE / 4;
+  auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
   FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
-  ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk_nulls->length())));
-  ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk_nulls.get())));
-  for (int i = 0; i < 3; i++) {
-    ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length())));
-    ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get())));
+  for (int i = 0; i < 4; i++) {
+    ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+    ASSERT_OK_NO_THROW(
+        writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
   }
-  ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+  ASSERT_OK_NO_THROW(writer.Close());
 
-  std::shared_ptr<Array> out;
-  this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
-  ASSERT_TRUE(values->Equals(out));
+  this->ReadAndCheckSingleColumnFile(values.get());
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
   // This also tests max_definition_level = 1
-  std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(LARGE_SIZE, 128, 100);
+  auto values = NullableArray<TypeParam>(LARGE_SIZE, 100);
   std::shared_ptr<Table> table = MakeSimpleTable(values, true);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_NO_THROW(
-      ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512)));
+  ASSERT_OK_NO_THROW(WriteFlatTable(
+      table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties()));
 
-  std::shared_ptr<Table> out;
-  this->ReadTableFromFile(this->ReaderFromSink(), &out);
-  ASSERT_EQ(1, out->num_columns());
-  ASSERT_EQ(LARGE_SIZE, out->num_rows());
+  this->ReadAndCheckSingleColumnTable(values);
+}
 
-  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
-  ASSERT_EQ(1, chunked_array->num_chunks());
-  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+using TestUInt32ParquetIO = TestParquetIO<UInt32Type>;
+
+TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<PrimitiveArray> values = NullableArray<UInt32Type>(LARGE_SIZE, 100);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+  // Parquet 2.0 roundtrip should yield an uint32_t column again
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  std::shared_ptr<::parquet::WriterProperties> properties =
+      ::parquet::WriterProperties::Builder()
+          .version(ParquetVersion::PARQUET_2_0)
+          ->build();
+  ASSERT_OK_NO_THROW(
+      WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
+  this->ReadAndCheckSingleColumnTable(values);
+}
+
+TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<PrimitiveArray> values = NullableArray<UInt32Type>(LARGE_SIZE, 100);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+  // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0
+  // reader that a column is unsigned.
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  std::shared_ptr<::parquet::WriterProperties> properties =
+      ::parquet::WriterProperties::Builder()
+          .version(ParquetVersion::PARQUET_1_0)
+          ->build();
+  ASSERT_OK_NO_THROW(
+      WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
+
+  std::shared_ptr<Array> expected_values;
+  std::shared_ptr<PoolBuffer> int64_data =
+      std::make_shared<PoolBuffer>(default_memory_pool());
+  {
+    ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
+    int64_t* int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
+    const uint32_t* uint32_data_ptr =
+        reinterpret_cast<const uint32_t*>(values->data()->data());
+    // std::copy might be faster but this is explicit on the casts)
+    for (int64_t i = 0; i < values->length(); i++) {
+      int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]);
+    }
+  }
+  ASSERT_OK(MakePrimitiveArray(std::make_shared<Int64Type>(), values->length(),
+      int64_data, values->null_count(), values->null_bitmap(), &expected_values));
+  this->ReadAndCheckSingleColumnTable(expected_values);
+}
+
+template <typename T>
+using ParquetCDataType = typename ParquetDataType<T>::c_type;
+
+template <typename TestType>
+class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
+ public:
+  typedef typename TestType::c_type T;
+
+  void TestFile(std::vector<T>& values, int num_chunks,
+      std::unique_ptr<ParquetFileReader>* file_reader) {
+    std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+    std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema);
+    size_t chunk_size = values.size() / num_chunks;
+    // Convert to Parquet's expected physical type
+    std::vector<uint8_t> values_buffer(
+        sizeof(ParquetCDataType<TestType>) * values.size());
+    auto values_parquet =
+        reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data());
+    std::copy(values.cbegin(), values.cend(), values_parquet);
+    for (int i = 0; i < num_chunks; i++) {
+      auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
+      auto column_writer =
+          static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
+      ParquetCDataType<TestType>* data = values_parquet + i * chunk_size;
+      column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
+      column_writer->Close();
+      row_group_writer->Close();
+    }
+    file_writer->Close();
+    *file_reader = this->ReaderFromSink();
+  }
+
+  void TestSingleColumnRequiredTableRead(int num_chunks) {
+    std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+    std::unique_ptr<ParquetFileReader> file_reader;
+    ASSERT_NO_THROW(TestFile(values, num_chunks, &file_reader));
+
+    std::shared_ptr<Table> out;
+    this->ReadTableFromFile(std::move(file_reader), &out);
+    ASSERT_EQ(1, out->num_columns());
+    ASSERT_EQ(SMALL_SIZE, out->num_rows());
+
+    std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+    ASSERT_EQ(1, chunked_array->num_chunks());
+    ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get());
+  }
+
+  void TestSingleColumnRequiredRead(int num_chunks) {
+    std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+    std::unique_ptr<ParquetFileReader> file_reader;
+    ASSERT_NO_THROW(TestFile(values, num_chunks, &file_reader));
+
+    std::shared_ptr<Array> out;
+    this->ReadSingleColumnFile(std::move(file_reader), &out);
+
+    ExpectArray<TestType>(values.data(), out.get());
+  }
+};
+
+typedef ::testing::Types<BooleanType, UInt8Type, Int8Type, UInt16Type, Int16Type,
+    UInt32Type, Int32Type, UInt64Type, Int64Type, FloatType,
+    DoubleType> PrimitiveTestTypes;
+
+TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes);
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) {
+  this->TestSingleColumnRequiredRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) {
+  this->TestSingleColumnRequiredTableRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) {
+  this->TestSingleColumnRequiredRead(4);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
+  this->TestSingleColumnRequiredTableRead(4);
 }
 
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/parquet-schema-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-schema-test.cc b/cpp/src/arrow/parquet/parquet-schema-test.cc
index 8de7394..819cdd3 100644
--- a/cpp/src/arrow/parquet/parquet-schema-test.cc
+++ b/cpp/src/arrow/parquet/parquet-schema-test.cc
@@ -183,7 +183,9 @@ class TestConvertArrowSchema : public ::testing::Test {
 
   Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) {
     arrow_schema_ = std::make_shared<Schema>(fields);
-    return ToParquetSchema(arrow_schema_.get(), &result_schema_);
+    std::shared_ptr<::parquet::WriterProperties> properties =
+        ::parquet::default_writer_properties();
+    return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_);
   }
 
  protected:

http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc
index 3b4882d..7b05665 100644
--- a/cpp/src/arrow/parquet/reader.cc
+++ b/cpp/src/arrow/parquet/reader.cc
@@ -17,6 +17,7 @@
 
 #include "arrow/parquet/reader.h"
 
+#include <algorithm>
 #include <queue>
 #include <string>
 #include <vector>
@@ -27,6 +28,7 @@
 #include "arrow/schema.h"
 #include "arrow/table.h"
 #include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
 #include "arrow/util/status.h"
 
 using parquet::ColumnReader;
@@ -36,6 +38,19 @@ using parquet::TypedColumnReader;
 namespace arrow {
 namespace parquet {
 
+template <typename ArrowType>
+struct ArrowTypeTraits {
+  typedef NumericBuilder<ArrowType> builder_type;
+};
+
+template <>
+struct ArrowTypeTraits<BooleanType> {
+  typedef BooleanBuilder builder_type;
+};
+
+template <typename ArrowType>
+using BuilderType = typename ArrowTypeTraits<ArrowType>::builder_type;
+
 class FileReader::Impl {
  public:
   Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader);
@@ -61,9 +76,45 @@ class FlatColumnReader::Impl {
   template <typename ArrowType, typename ParquetType>
   Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
 
+  template <typename ArrowType, typename ParquetType>
+  Status ReadNullableFlatBatch(const int16_t* def_levels,
+      typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
+      BuilderType<ArrowType>* builder);
+  template <typename ArrowType, typename ParquetType>
+  Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read,
+      BuilderType<ArrowType>* builder);
+
  private:
   void NextRowGroup();
 
+  template <typename InType, typename OutType>
+  struct can_copy_ptr {
+    static constexpr bool value =
+        std::is_same<InType, OutType>::value ||
+        (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
+            (sizeof(InType) == sizeof(OutType)));
+  };
+
+  template <typename InType, typename OutType,
+      typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+  Status ConvertPhysicalType(
+      const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+    *out_ptr = reinterpret_cast<const OutType*>(in_ptr);
+    return Status::OK();
+  }
+
+  template <typename InType, typename OutType,
+      typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+  Status ConvertPhysicalType(
+      const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+    RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType)));
+    OutType* mutable_out_ptr =
+        reinterpret_cast<OutType*>(values_builder_buffer_.mutable_data());
+    std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
+    *out_ptr = mutable_out_ptr;
+    return Status::OK();
+  }
+
   MemoryPool* pool_;
   const ::parquet::ColumnDescriptor* descr_;
   ::parquet::ParquetFileReader* reader_;
@@ -156,12 +207,52 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor
 }
 
 template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::ReadNonNullableBatch(typename ParquetType::c_type* values,
+    int64_t values_read, BuilderType<ArrowType>* builder) {
+  using ArrowCType = typename ArrowType::c_type;
+  using ParquetCType = typename ParquetType::c_type;
+
+  DCHECK(builder);
+  const ArrowCType* values_ptr;
+  RETURN_NOT_OK(
+      (ConvertPhysicalType<ParquetCType, ArrowCType>(values, values_read, &values_ptr)));
+  RETURN_NOT_OK(builder->Append(values_ptr, values_read));
+  return Status::OK();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
+    typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
+    BuilderType<ArrowType>* builder) {
+  using ArrowCType = typename ArrowType::c_type;
+
+  DCHECK(builder);
+  RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * sizeof(ArrowCType)));
+  RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
+  auto values_ptr = reinterpret_cast<ArrowCType*>(values_builder_buffer_.mutable_data());
+  uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
+  int values_idx = 0;
+  for (int64_t i = 0; i < levels_read; i++) {
+    if (def_levels[i] < descr_->max_definition_level()) {
+      valid_bytes[i] = 0;
+    } else {
+      valid_bytes[i] = 1;
+      values_ptr[i] = values[values_idx++];
+    }
+  }
+  RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes));
+  return Status::OK();
+}
+
+template <typename ArrowType, typename ParquetType>
 Status FlatColumnReader::Impl::TypedReadBatch(
     int batch_size, std::shared_ptr<Array>* out) {
+  using ParquetCType = typename ParquetType::c_type;
+
   int values_to_read = batch_size;
-  NumericBuilder<ArrowType> builder(pool_, field_->type);
+  BuilderType<ArrowType> builder(pool_, field_->type);
   while ((values_to_read > 0) && column_reader_) {
-    values_buffer_.Resize(values_to_read * sizeof(typename ParquetType::c_type));
+    values_buffer_.Resize(values_to_read * sizeof(ParquetCType));
     if (descr_->max_definition_level() > 0) {
       def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
     }
@@ -169,31 +260,62 @@ Status FlatColumnReader::Impl::TypedReadBatch(
     int64_t values_read;
     int64_t levels_read;
     int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-    auto values =
-        reinterpret_cast<typename ParquetType::c_type*>(values_buffer_.mutable_data());
+    auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
     PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
                              values_to_read, def_levels, nullptr, values, &values_read));
     values_to_read -= levels_read;
     if (descr_->max_definition_level() == 0) {
-      RETURN_NOT_OK(builder.Append(values, values_read));
+      RETURN_NOT_OK(
+          (ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read, &builder)));
+    } else {
+      // As per the defintion and checks for flat columns:
+      // descr_->max_definition_level() == 1
+      RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>(
+          def_levels, values, values_read, levels_read, &builder)));
+    }
+    if (!column_reader_->HasNext()) { NextRowGroup(); }
+  }
+  *out = builder.Finish();
+  return Status::OK();
+}
+
+template <>
+Status FlatColumnReader::Impl::TypedReadBatch<StringType, ::parquet::ByteArrayType>(
+    int batch_size, std::shared_ptr<Array>* out) {
+  int values_to_read = batch_size;
+  StringBuilder builder(pool_, field_->type);
+  while ((values_to_read > 0) && column_reader_) {
+    values_buffer_.Resize(values_to_read * sizeof(::parquet::ByteArray));
+    if (descr_->max_definition_level() > 0) {
+      def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
+    }
+    auto reader =
+        dynamic_cast<TypedColumnReader<::parquet::ByteArrayType>*>(column_reader_.get());
+    int64_t values_read;
+    int64_t levels_read;
+    int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+    auto values = reinterpret_cast<::parquet::ByteArray*>(values_buffer_.mutable_data());
+    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+                             values_to_read, def_levels, nullptr, values, &values_read));
+    values_to_read -= levels_read;
+    if (descr_->max_definition_level() == 0) {
+      for (int64_t i = 0; i < levels_read; i++) {
+        RETURN_NOT_OK(
+            builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len));
+      }
     } else {
       // descr_->max_definition_level() == 1
-      RETURN_NOT_OK(values_builder_buffer_.Resize(
-          levels_read * sizeof(typename ParquetType::c_type)));
-      RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
-      auto values_ptr = reinterpret_cast<typename ParquetType::c_type*>(
-          values_builder_buffer_.mutable_data());
-      uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
       int values_idx = 0;
       for (int64_t i = 0; i < levels_read; i++) {
         if (def_levels[i] < descr_->max_definition_level()) {
-          valid_bytes[i] = 0;
+          RETURN_NOT_OK(builder.AppendNull());
         } else {
-          valid_bytes[i] = 1;
-          values_ptr[i] = values[values_idx++];
+          RETURN_NOT_OK(
+              builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr),
+                  values[values_idx].len));
+          values_idx++;
         }
       }
-      builder.Append(values_ptr, levels_read, valid_bytes);
     }
     if (!column_reader_->HasNext()) { NextRowGroup(); }
   }
@@ -214,10 +336,18 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>*
   }
 
   switch (field_->type->type) {
+    TYPED_BATCH_CASE(BOOL, BooleanType, ::parquet::BooleanType)
+    TYPED_BATCH_CASE(UINT8, UInt8Type, ::parquet::Int32Type)
+    TYPED_BATCH_CASE(INT8, Int8Type, ::parquet::Int32Type)
+    TYPED_BATCH_CASE(UINT16, UInt16Type, ::parquet::Int32Type)
+    TYPED_BATCH_CASE(INT16, Int16Type, ::parquet::Int32Type)
+    TYPED_BATCH_CASE(UINT32, UInt32Type, ::parquet::Int32Type)
     TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type)
+    TYPED_BATCH_CASE(UINT64, UInt64Type, ::parquet::Int64Type)
     TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type)
     TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType)
     TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType)
+    TYPED_BATCH_CASE(STRING, StringType, ::parquet::ByteArrayType)
     default:
       return Status::NotImplemented(field_->type->ToString());
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/schema.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc
index c7979db..a79342a 100644
--- a/cpp/src/arrow/parquet/schema.cc
+++ b/cpp/src/arrow/parquet/schema.cc
@@ -42,7 +42,12 @@ namespace parquet {
 
 const auto BOOL = std::make_shared<BooleanType>();
 const auto UINT8 = std::make_shared<UInt8Type>();
+const auto INT8 = std::make_shared<Int8Type>();
+const auto UINT16 = std::make_shared<UInt16Type>();
+const auto INT16 = std::make_shared<Int16Type>();
+const auto UINT32 = std::make_shared<UInt32Type>();
 const auto INT32 = std::make_shared<Int32Type>();
+const auto UINT64 = std::make_shared<UInt64Type>();
 const auto INT64 = std::make_shared<Int64Type>();
 const auto FLOAT = std::make_shared<FloatType>();
 const auto DOUBLE = std::make_shared<DoubleType>();
@@ -92,6 +97,21 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
     case LogicalType::NONE:
       *out = INT32;
       break;
+    case LogicalType::UINT_8:
+      *out = UINT8;
+      break;
+    case LogicalType::INT_8:
+      *out = INT8;
+      break;
+    case LogicalType::UINT_16:
+      *out = UINT16;
+      break;
+    case LogicalType::INT_16:
+      *out = INT16;
+      break;
+    case LogicalType::UINT_32:
+      *out = UINT32;
+      break;
     case LogicalType::DECIMAL:
       *out = MakeDecimalType(node);
       break;
@@ -107,6 +127,9 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
     case LogicalType::NONE:
       *out = INT64;
       break;
+    case LogicalType::UINT_64:
+      *out = UINT64;
+      break;
     case LogicalType::DECIMAL:
       *out = MakeDecimalType(node);
       break;
@@ -187,20 +210,21 @@ Status FromParquetSchema(
 }
 
 Status StructToNode(const std::shared_ptr<StructType>& type, const std::string& name,
-    bool nullable, NodePtr* out) {
+    bool nullable, const ::parquet::WriterProperties& properties, NodePtr* out) {
   Repetition::type repetition = Repetition::REQUIRED;
   if (nullable) { repetition = Repetition::OPTIONAL; }
 
   std::vector<NodePtr> children(type->num_children());
   for (int i = 0; i < type->num_children(); i++) {
-    RETURN_NOT_OK(FieldToNode(type->child(i), &children[i]));
+    RETURN_NOT_OK(FieldToNode(type->child(i), properties, &children[i]));
   }
 
   *out = GroupNode::Make(name, repetition, children);
   return Status::OK();
 }
 
-Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
+Status FieldToNode(const std::shared_ptr<Field>& field,
+    const ::parquet::WriterProperties& properties, NodePtr* out) {
   LogicalType::type logical_type = LogicalType::NONE;
   ParquetType::type type;
   Repetition::type repetition = Repetition::REQUIRED;
@@ -231,8 +255,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
       logical_type = LogicalType::INT_16;
       break;
     case Type::UINT32:
-      type = ParquetType::INT32;
-      logical_type = LogicalType::UINT_32;
+      if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) {
+        type = ParquetType::INT64;
+      } else {
+        type = ParquetType::INT32;
+        logical_type = LogicalType::UINT_32;
+      }
       break;
     case Type::INT32:
       type = ParquetType::INT32;
@@ -277,7 +305,7 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
       break;
     case Type::STRUCT: {
       auto struct_type = std::static_pointer_cast<StructType>(field->type);
-      return StructToNode(struct_type, field->name, field->nullable, out);
+      return StructToNode(struct_type, field->name, field->nullable, properties, out);
     } break;
     default:
       // TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
@@ -287,11 +315,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
   return Status::OK();
 }
 
-Status ToParquetSchema(
-    const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out) {
+Status ToParquetSchema(const Schema* arrow_schema,
+    const ::parquet::WriterProperties& properties,
+    std::shared_ptr<::parquet::SchemaDescriptor>* out) {
   std::vector<NodePtr> nodes(arrow_schema->num_fields());
   for (int i = 0; i < arrow_schema->num_fields(); i++) {
-    RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), &nodes[i]));
+    RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i]));
   }
 
   NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);

http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/schema.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/schema.h b/cpp/src/arrow/parquet/schema.h
index ec5f960..39bee05 100644
--- a/cpp/src/arrow/parquet/schema.h
+++ b/cpp/src/arrow/parquet/schema.h
@@ -21,6 +21,7 @@
 #include <memory>
 
 #include "parquet/api/schema.h"
+#include "parquet/api/writer.h"
 
 #include "arrow/schema.h"
 #include "arrow/type.h"
@@ -36,10 +37,12 @@ Status NodeToField(const ::parquet::schema::NodePtr& node, std::shared_ptr<Field
 Status FromParquetSchema(
     const ::parquet::SchemaDescriptor* parquet_schema, std::shared_ptr<Schema>* out);
 
-Status FieldToNode(const std::shared_ptr<Field>& field, ::parquet::schema::NodePtr* out);
+Status FieldToNode(const std::shared_ptr<Field>& field,
+    const ::parquet::WriterProperties& properties, ::parquet::schema::NodePtr* out);
 
-Status ToParquetSchema(
-    const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out);
+Status ToParquetSchema(const Schema* arrow_schema,
+    const ::parquet::WriterProperties& properties,
+    std::shared_ptr<::parquet::SchemaDescriptor>* out);
 
 }  // namespace parquet
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/test-util.h b/cpp/src/arrow/parquet/test-util.h
index cc8723b..68a7fb9 100644
--- a/cpp/src/arrow/parquet/test-util.h
+++ b/cpp/src/arrow/parquet/test-util.h
@@ -18,26 +18,90 @@
 #include <string>
 #include <vector>
 
+#include "arrow/test-util.h"
 #include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
 
 namespace arrow {
 
 namespace parquet {
 
 template <typename ArrowType>
-std::shared_ptr<PrimitiveArray> NonNullArray(
-    size_t size, typename ArrowType::c_type value) {
-  std::vector<typename ArrowType::c_type> values(size, value);
+using is_arrow_float = std::is_floating_point<typename ArrowType::c_type>;
+
+template <typename ArrowType>
+using is_arrow_int = std::is_integral<typename ArrowType::c_type>;
+
+template <typename ArrowType>
+using is_arrow_string = std::is_same<ArrowType, StringType>;
+
+template <class ArrowType>
+typename std::enable_if<is_arrow_float<ArrowType>::value,
+    std::shared_ptr<PrimitiveArray>>::type
+NonNullArray(size_t size) {
+  std::vector<typename ArrowType::c_type> values;
+  ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values);
   NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
   builder.Append(values.data(), values.size());
   return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
 }
 
-// This helper function only supports (size/2) nulls yet.
+template <class ArrowType>
+typename std::enable_if<is_arrow_int<ArrowType>::value,
+    std::shared_ptr<PrimitiveArray>>::type
+NonNullArray(size_t size) {
+  std::vector<typename ArrowType::c_type> values;
+  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
+  builder.Append(values.data(), values.size());
+  return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+template <class ArrowType>
+typename std::enable_if<is_arrow_string<ArrowType>::value,
+    std::shared_ptr<StringArray>>::type
+NonNullArray(size_t size) {
+  StringBuilder builder(default_memory_pool(), std::make_shared<StringType>());
+  for (size_t i = 0; i < size; i++) {
+    builder.Append("test-string");
+  }
+  return std::static_pointer_cast<StringArray>(builder.Finish());
+}
+
+template <>
+std::shared_ptr<PrimitiveArray> NonNullArray<BooleanType>(size_t size) {
+  std::vector<uint8_t> values;
+  ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+  BooleanBuilder builder(default_memory_pool(), std::make_shared<BooleanType>());
+  builder.Append(values.data(), values.size());
+  return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls.
+template <typename ArrowType>
+typename std::enable_if<is_arrow_float<ArrowType>::value,
+    std::shared_ptr<PrimitiveArray>>::type
+NullableArray(size_t size, size_t num_nulls) {
+  std::vector<typename ArrowType::c_type> values;
+  ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values);
+  std::vector<uint8_t> valid_bytes(size, 1);
+
+  for (size_t i = 0; i < num_nulls; i++) {
+    valid_bytes[i * 2] = 0;
+  }
+
+  NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
+  builder.Append(values.data(), values.size(), valid_bytes.data());
+  return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls.
 template <typename ArrowType>
-std::shared_ptr<PrimitiveArray> NullableArray(
-    size_t size, typename ArrowType::c_type value, size_t num_nulls) {
-  std::vector<typename ArrowType::c_type> values(size, value);
+typename std::enable_if<is_arrow_int<ArrowType>::value,
+    std::shared_ptr<PrimitiveArray>>::type
+NullableArray(size_t size, size_t num_nulls) {
+  std::vector<typename ArrowType::c_type> values;
+  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
   std::vector<uint8_t> valid_bytes(size, 1);
 
   for (size_t i = 0; i < num_nulls; i++) {
@@ -49,14 +113,49 @@ std::shared_ptr<PrimitiveArray> NullableArray(
   return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
 }
 
-std::shared_ptr<Column> MakeColumn(const std::string& name,
-    const std::shared_ptr<PrimitiveArray>& array, bool nullable) {
+// This helper function only supports (size/2) nulls yet.
+template <typename ArrowType>
+typename std::enable_if<is_arrow_string<ArrowType>::value,
+    std::shared_ptr<StringArray>>::type
+NullableArray(size_t size, size_t num_nulls) {
+  std::vector<uint8_t> valid_bytes(size, 1);
+
+  for (size_t i = 0; i < num_nulls; i++) {
+    valid_bytes[i * 2] = 0;
+  }
+
+  StringBuilder builder(default_memory_pool(), std::make_shared<StringType>());
+  for (size_t i = 0; i < size; i++) {
+    builder.Append("test-string");
+  }
+  return std::static_pointer_cast<StringArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls yet.
+template <>
+std::shared_ptr<PrimitiveArray> NullableArray<BooleanType>(
+    size_t size, size_t num_nulls) {
+  std::vector<uint8_t> values;
+  ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
+  std::vector<uint8_t> valid_bytes(size, 1);
+
+  for (size_t i = 0; i < num_nulls; i++) {
+    valid_bytes[i * 2] = 0;
+  }
+
+  BooleanBuilder builder(default_memory_pool(), std::make_shared<BooleanType>());
+  builder.Append(values.data(), values.size(), valid_bytes.data());
+  return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+std::shared_ptr<Column> MakeColumn(
+    const std::string& name, const std::shared_ptr<Array>& array, bool nullable) {
   auto field = std::make_shared<Field>(name, array->type(), nullable);
   return std::make_shared<Column>(field, array);
 }
 
 std::shared_ptr<Table> MakeSimpleTable(
-    const std::shared_ptr<PrimitiveArray>& values, bool nullable) {
+    const std::shared_ptr<Array>& values, bool nullable) {
   std::shared_ptr<Column> column = MakeColumn("col", values, nullable);
   std::vector<std::shared_ptr<Column>> columns({column});
   std::vector<std::shared_ptr<Field>> fields({column->field()});
@@ -72,6 +171,23 @@ void ExpectArray(T* expected, Array* result) {
   }
 }
 
+template <typename ArrowType>
+void ExpectArray(typename ArrowType::c_type* expected, Array* result) {
+  PrimitiveArray* p_array = static_cast<PrimitiveArray*>(result);
+  for (int64_t i = 0; i < result->length(); i++) {
+    EXPECT_EQ(expected[i],
+        reinterpret_cast<const typename ArrowType::c_type*>(p_array->data()->data())[i]);
+  }
+}
+
+template <>
+void ExpectArray<BooleanType>(uint8_t* expected, Array* result) {
+  BooleanBuilder builder(default_memory_pool(), std::make_shared<BooleanType>());
+  builder.Append(expected, result->length());
+  std::shared_ptr<Array> expected_array = builder.Finish();
+  EXPECT_TRUE(result->Equals(expected_array));
+}
+
 }  // namespace parquet
 
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc
index 4005e3b..63449bb 100644
--- a/cpp/src/arrow/parquet/writer.cc
+++ b/cpp/src/arrow/parquet/writer.cc
@@ -25,11 +25,13 @@
 #include "arrow/table.h"
 #include "arrow/types/construct.h"
 #include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
 #include "arrow/parquet/schema.h"
 #include "arrow/parquet/utils.h"
 #include "arrow/util/status.h"
 
 using parquet::ParquetFileWriter;
+using parquet::ParquetVersion;
 using parquet::schema::GroupNode;
 
 namespace arrow {
@@ -41,10 +43,40 @@ class FileWriter::Impl {
   Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer);
 
   Status NewRowGroup(int64_t chunk_size);
-  template <typename ParquetType>
+  template <typename ParquetType, typename ArrowType>
   Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data,
       int64_t offset, int64_t length);
+
+  // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary
+  // buffer
+  template <typename InType, typename OutType>
+  struct can_copy_ptr {
+    static constexpr bool value =
+        std::is_same<InType, OutType>::value ||
+        (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
+            (sizeof(InType) == sizeof(OutType)));
+  };
+
+  template <typename InType, typename OutType,
+      typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+  Status ConvertPhysicalType(const InType* in_ptr, int64_t, const OutType** out_ptr) {
+    *out_ptr = reinterpret_cast<const OutType*>(in_ptr);
+    return Status::OK();
+  }
+
+  template <typename InType, typename OutType,
+      typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+  Status ConvertPhysicalType(
+      const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+    RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(OutType)));
+    OutType* mutable_out_ptr = reinterpret_cast<OutType*>(data_buffer_.mutable_data());
+    std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
+    *out_ptr = mutable_out_ptr;
+    return Status::OK();
+  }
+
   Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length);
+  Status WriteFlatColumnChunk(const StringArray* data, int64_t offset, int64_t length);
   Status Close();
 
   virtual ~Impl() {}
@@ -53,6 +85,8 @@ class FileWriter::Impl {
   friend class FileWriter;
 
   MemoryPool* pool_;
+  // Buffer used for storing the data of an array converted to the physical type
+  // as expected by parquet-cpp.
   PoolBuffer data_buffer_;
   PoolBuffer def_levels_buffer_;
   std::unique_ptr<::parquet::ParquetFileWriter> writer_;
@@ -72,36 +106,95 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
   return Status::OK();
 }
 
-template <typename ParquetType>
+template <typename ParquetType, typename ArrowType>
 Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer,
     const PrimitiveArray* data, int64_t offset, int64_t length) {
-  // TODO: DCHECK((offset + length) <= data->length());
-  auto data_ptr =
-      reinterpret_cast<const typename ParquetType::c_type*>(data->data()->data()) +
-      offset;
+  using ArrowCType = typename ArrowType::c_type;
+  using ParquetCType = typename ParquetType::c_type;
+
+  DCHECK((offset + length) <= data->length());
+  auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data()) + offset;
   auto writer =
       reinterpret_cast<::parquet::TypedColumnWriter<ParquetType>*>(column_writer);
   if (writer->descr()->max_definition_level() == 0) {
     // no nulls, just dump the data
-    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_ptr));
+    const ParquetCType* data_writer_ptr;
+    RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
+        data_ptr, length, &data_writer_ptr)));
+    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_writer_ptr));
   } else if (writer->descr()->max_definition_level() == 1) {
     RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
     int16_t* def_levels_ptr =
         reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
     if (data->null_count() == 0) {
       std::fill(def_levels_ptr, def_levels_ptr + length, 1);
-      PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, data_ptr));
+      const ParquetCType* data_writer_ptr;
+      RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
+          data_ptr, length, &data_writer_ptr)));
+      PARQUET_CATCH_NOT_OK(
+          writer->WriteBatch(length, def_levels_ptr, nullptr, data_writer_ptr));
     } else {
-      RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(typename ParquetType::c_type)));
-      auto buffer_ptr =
-          reinterpret_cast<typename ParquetType::c_type*>(data_buffer_.mutable_data());
+      RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType)));
+      auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
       int buffer_idx = 0;
       for (int i = 0; i < length; i++) {
         if (data->IsNull(offset + i)) {
           def_levels_ptr[i] = 0;
         } else {
           def_levels_ptr[i] = 1;
-          buffer_ptr[buffer_idx++] = data_ptr[i];
+          buffer_ptr[buffer_idx++] = static_cast<ParquetCType>(data_ptr[i]);
+        }
+      }
+      PARQUET_CATCH_NOT_OK(
+          writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+    }
+  } else {
+    return Status::NotImplemented("no support for max definition level > 1 yet");
+  }
+  PARQUET_CATCH_NOT_OK(writer->Close());
+  return Status::OK();
+}
+
+// This specialization seems quite similar but it significantly differs in two points:
+// * offset is added at the most latest time to the pointer as we have sub-byte access
+// * Arrow data is stored bitwise thus we cannot use std::copy to transform from
+//   ArrowType::c_type to ParquetType::c_type
+template <>
+Status FileWriter::Impl::TypedWriteBatch<::parquet::BooleanType, BooleanType>(
+    ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset,
+    int64_t length) {
+  DCHECK((offset + length) <= data->length());
+  RETURN_NOT_OK(data_buffer_.Resize(length));
+  auto data_ptr = reinterpret_cast<const uint8_t*>(data->data()->data());
+  auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data());
+  auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::BooleanType>*>(
+      column_writer);
+  if (writer->descr()->max_definition_level() == 0) {
+    // no nulls, just dump the data
+    for (int64_t i = 0; i < length; i++) {
+      buffer_ptr[i] = util::get_bit(data_ptr, offset + i);
+    }
+    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, buffer_ptr));
+  } else if (writer->descr()->max_definition_level() == 1) {
+    RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
+    int16_t* def_levels_ptr =
+        reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+    if (data->null_count() == 0) {
+      std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+      for (int64_t i = 0; i < length; i++) {
+        buffer_ptr[i] = util::get_bit(data_ptr, offset + i);
+      }
+      // TODO(PARQUET-644): write boolean values as a packed bitmap
+      PARQUET_CATCH_NOT_OK(
+          writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+    } else {
+      int buffer_idx = 0;
+      for (int i = 0; i < length; i++) {
+        if (data->IsNull(offset + i)) {
+          def_levels_ptr[i] = 0;
+        } else {
+          def_levels_ptr[i] = 1;
+          buffer_ptr[buffer_idx++] = util::get_bit(data_ptr, offset + i);
         }
       }
       PARQUET_CATCH_NOT_OK(
@@ -120,9 +213,9 @@ Status FileWriter::Impl::Close() {
   return Status::OK();
 }
 
-#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType)                 \
-  case Type::ENUM:                                                     \
-    return TypedWriteBatch<ParquetType>(writer, data, offset, length); \
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType)                            \
+  case Type::ENUM:                                                                \
+    return TypedWriteBatch<ParquetType, ArrowType>(writer, data, offset, length); \
     break;
 
 Status FileWriter::Impl::WriteFlatColumnChunk(
@@ -130,15 +223,76 @@ Status FileWriter::Impl::WriteFlatColumnChunk(
   ::parquet::ColumnWriter* writer;
   PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn());
   switch (data->type_enum()) {
-    TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type)
-    TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type)
-    TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType)
-    TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType)
+    TYPED_BATCH_CASE(BOOL, BooleanType, ::parquet::BooleanType)
+    TYPED_BATCH_CASE(UINT8, UInt8Type, ::parquet::Int32Type)
+    TYPED_BATCH_CASE(INT8, Int8Type, ::parquet::Int32Type)
+    TYPED_BATCH_CASE(UINT16, UInt16Type, ::parquet::Int32Type)
+    TYPED_BATCH_CASE(INT16, Int16Type, ::parquet::Int32Type)
+    case Type::UINT32:
+      if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
+        // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
+        // to use the larger Int64Type to store them lossless.
+        return TypedWriteBatch<::parquet::Int64Type, UInt32Type>(
+            writer, data, offset, length);
+      } else {
+        return TypedWriteBatch<::parquet::Int32Type, UInt32Type>(
+            writer, data, offset, length);
+      }
+      TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type)
+      TYPED_BATCH_CASE(UINT64, UInt64Type, ::parquet::Int64Type)
+      TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type)
+      TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType)
+      TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType)
     default:
       return Status::NotImplemented(data->type()->ToString());
   }
 }
 
+Status FileWriter::Impl::WriteFlatColumnChunk(
+    const StringArray* data, int64_t offset, int64_t length) {
+  ::parquet::ColumnWriter* column_writer;
+  PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
+  DCHECK((offset + length) <= data->length());
+  RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(::parquet::ByteArray)));
+  auto buffer_ptr = reinterpret_cast<::parquet::ByteArray*>(data_buffer_.mutable_data());
+  auto values = std::dynamic_pointer_cast<PrimitiveArray>(data->values());
+  auto data_ptr = reinterpret_cast<const uint8_t*>(values->data()->data());
+  DCHECK(values != nullptr);
+  auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::ByteArrayType>*>(
+      column_writer);
+  if (writer->descr()->max_definition_level() > 0) {
+    RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
+  }
+  int16_t* def_levels_ptr = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+  if (writer->descr()->max_definition_level() == 0 || data->null_count() == 0) {
+    // no nulls, just dump the data
+    for (int64_t i = 0; i < length; i++) {
+      buffer_ptr[i] = ::parquet::ByteArray(
+          data->value_length(i + offset), data_ptr + data->value_offset(i));
+    }
+    if (writer->descr()->max_definition_level() > 0) {
+      std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+    }
+    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+  } else if (writer->descr()->max_definition_level() == 1) {
+    int buffer_idx = 0;
+    for (int64_t i = 0; i < length; i++) {
+      if (data->IsNull(offset + i)) {
+        def_levels_ptr[i] = 0;
+      } else {
+        def_levels_ptr[i] = 1;
+        buffer_ptr[buffer_idx++] = ::parquet::ByteArray(
+            data->value_length(i + offset), data_ptr + data->value_offset(i + offset));
+      }
+    }
+    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+  } else {
+    return Status::NotImplemented("no support for max definition level > 1 yet");
+  }
+  PARQUET_CATCH_NOT_OK(writer->Close());
+  return Status::OK();
+}
+
 FileWriter::FileWriter(
     MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer)
     : impl_(new FileWriter::Impl(pool, std::move(writer))) {}
@@ -148,10 +302,20 @@ Status FileWriter::NewRowGroup(int64_t chunk_size) {
 }
 
 Status FileWriter::WriteFlatColumnChunk(
-    const PrimitiveArray* data, int64_t offset, int64_t length) {
+    const Array* array, int64_t offset, int64_t length) {
   int64_t real_length = length;
-  if (length == -1) { real_length = data->length(); }
-  return impl_->WriteFlatColumnChunk(data, offset, real_length);
+  if (length == -1) { real_length = array->length(); }
+  if (array->type_enum() == Type::STRING) {
+    auto string_array = dynamic_cast<const StringArray*>(array);
+    DCHECK(string_array);
+    return impl_->WriteFlatColumnChunk(string_array, offset, real_length);
+  } else {
+    auto primitive_array = dynamic_cast<const PrimitiveArray*>(array);
+    if (!primitive_array) {
+      return Status::NotImplemented("Table must consist of PrimitiveArray instances");
+    }
+    return impl_->WriteFlatColumnChunk(primitive_array, offset, real_length);
+  }
 }
 
 Status FileWriter::Close() {
@@ -165,40 +329,30 @@ MemoryPool* FileWriter::memory_pool() const {
 FileWriter::~FileWriter() {}
 
 Status WriteFlatTable(const Table* table, MemoryPool* pool,
-    std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size) {
+    const std::shared_ptr<::parquet::OutputStream>& sink, int64_t chunk_size,
+    const std::shared_ptr<::parquet::WriterProperties>& properties) {
   std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema;
-  RETURN_NOT_OK(ToParquetSchema(table->schema().get(), &parquet_schema));
+  RETURN_NOT_OK(
+      ToParquetSchema(table->schema().get(), *properties.get(), &parquet_schema));
   auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema());
   std::unique_ptr<ParquetFileWriter> parquet_writer =
-      ParquetFileWriter::Open(sink, schema_node);
+      ParquetFileWriter::Open(sink, schema_node, properties);
   FileWriter writer(pool, std::move(parquet_writer));
 
-  // TODO: Support writing chunked arrays.
+  // TODO(ARROW-232) Support writing chunked arrays.
   for (int i = 0; i < table->num_columns(); i++) {
     if (table->column(i)->data()->num_chunks() != 1) {
       return Status::NotImplemented("No support for writing chunked arrays yet.");
     }
   }
 
-  // Cast to PrimitiveArray instances as we work with them.
-  std::vector<std::shared_ptr<PrimitiveArray>> arrays(table->num_columns());
-  for (int i = 0; i < table->num_columns(); i++) {
-    // num_chunks == 1 as per above loop
-    std::shared_ptr<Array> array = table->column(i)->data()->chunk(0);
-    auto primitive_array = std::dynamic_pointer_cast<PrimitiveArray>(array);
-    if (!primitive_array) {
-      PARQUET_IGNORE_NOT_OK(writer.Close());
-      return Status::NotImplemented("Table must consist of PrimitiveArray instances");
-    }
-    arrays[i] = primitive_array;
-  }
-
   for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) {
     int64_t offset = chunk * chunk_size;
     int64_t size = std::min(chunk_size, table->num_rows() - offset);
     RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close()));
     for (int i = 0; i < table->num_columns(); i++) {
-      RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(arrays[i].get(), offset, size),
+      std::shared_ptr<Array> array = table->column(i)->data()->chunk(0);
+      RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(array.get(), offset, size),
           PARQUET_IGNORE_NOT_OK(writer.Close()));
     }
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h
index 93693f5..cfd80d8 100644
--- a/cpp/src/arrow/parquet/writer.h
+++ b/cpp/src/arrow/parquet/writer.h
@@ -25,10 +25,12 @@
 
 namespace arrow {
 
+class Array;
 class MemoryPool;
 class PrimitiveArray;
 class RowBatch;
 class Status;
+class StringArray;
 class Table;
 
 namespace parquet {
@@ -43,8 +45,7 @@ class FileWriter {
   FileWriter(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer);
 
   Status NewRowGroup(int64_t chunk_size);
-  Status WriteFlatColumnChunk(
-      const PrimitiveArray* data, int64_t offset = 0, int64_t length = -1);
+  Status WriteFlatColumnChunk(const Array* data, int64_t offset = 0, int64_t length = -1);
   Status Close();
 
   virtual ~FileWriter();
@@ -62,7 +63,9 @@ class FileWriter {
  * The table shall only consist of nullable, non-repeated columns of primitive type.
  */
 Status WriteFlatTable(const Table* table, MemoryPool* pool,
-    std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size);
+    const std::shared_ptr<::parquet::OutputStream>& sink, int64_t chunk_size,
+    const std::shared_ptr<::parquet::WriterProperties>& properties =
+        ::parquet::default_writer_properties());
 
 }  // namespace parquet
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 2f81161..055dac7 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -50,6 +50,8 @@
     if (!s.ok()) { FAIL() << s.ToString(); } \
   } while (0)
 
+#define ASSERT_OK_NO_THROW(expr) ASSERT_NO_THROW(ASSERT_OK(expr))
+
 #define EXPECT_OK(expr)  \
   do {                   \
     Status s = (expr);   \

http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/types/primitive.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive.cc b/cpp/src/arrow/types/primitive.cc
index 08fc847..f4b47f9 100644
--- a/cpp/src/arrow/types/primitive.cc
+++ b/cpp/src/arrow/types/primitive.cc
@@ -133,6 +133,11 @@ Status PrimitiveBuilder<BooleanType>::Append(
   RETURN_NOT_OK(Reserve(length));
 
   for (int i = 0; i < length; ++i) {
+    // Skip reading from unitialised memory
+    // TODO: This actually is only to keep valgrind happy but may or may not
+    // have a performance impact.
+    if ((valid_bytes != nullptr) && !valid_bytes[i]) continue;
+
     if (values[i] > 0) {
       util::set_bit(raw_data_, length_ + i);
     } else {

http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/python/pyarrow/includes/parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd
index 0918344..a2f83ea 100644
--- a/python/pyarrow/includes/parquet.pxd
+++ b/python/pyarrow/includes/parquet.pxd
@@ -32,6 +32,10 @@ cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
     pass
 
 cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
+  enum ParquetVersion" parquet::ParquetVersion::type":
+      PARQUET_1_0" parquet::ParquetVersion::PARQUET_1_0"
+      PARQUET_2_0" parquet::ParquetVersion::PARQUET_2_0"
+
   cdef cppclass SchemaDescriptor:
     shared_ptr[Node] schema()
     GroupNode* group()
@@ -80,6 +84,11 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
         LocalFileOutputStream(const c_string& path)
         void Close()
 
+    cdef cppclass WriterProperties:
+        cppclass Builder:
+            Builder* version(ParquetVersion version)
+            shared_ptr[WriterProperties] build()
+
 
 cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil:
     cdef cppclass FileReader:
@@ -93,5 +102,7 @@ cdef extern from "arrow/parquet/schema.h" namespace "arrow::parquet" nogil:
 
 
 cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil:
-    cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool, shared_ptr[OutputStream] sink, int64_t chunk_size)
+    cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool,
+            const shared_ptr[OutputStream]& sink, int64_t chunk_size,
+            const shared_ptr[WriterProperties]& properties)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/python/pyarrow/parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx
index 3d5355e..0b2b208 100644
--- a/python/pyarrow/parquet.pyx
+++ b/python/pyarrow/parquet.pyx
@@ -24,6 +24,7 @@ cimport pyarrow.includes.pyarrow as pyarrow
 from pyarrow.includes.parquet cimport *
 
 from pyarrow.compat import tobytes
+from pyarrow.error import ArrowException
 from pyarrow.error cimport check_cstatus
 from pyarrow.table cimport Table
 
@@ -42,11 +43,13 @@ def read_table(filename, columns=None):
     # in Cython (due to missing rvalue support)
     reader = unique_ptr[FileReader](new FileReader(default_memory_pool(),
         ParquetFileReader.OpenFile(tobytes(filename))))
-    check_cstatus(reader.get().ReadFlatTable(&ctable))
+    with nogil:
+        check_cstatus(reader.get().ReadFlatTable(&ctable))
+
     table.init(ctable)
     return table
 
-def write_table(table, filename, chunk_size=None):
+def write_table(table, filename, chunk_size=None, version=None):
     """
     Write a Table to Parquet format
 
@@ -56,16 +59,29 @@ def write_table(table, filename, chunk_size=None):
     filename : string
     chunk_size : int
         The maximum number of rows in each Parquet RowGroup
+    version : {"1.0", "2.0"}, default "1.0"
+        The Parquet format version, defaults to 1.0
     """
     cdef Table table_ = table
     cdef CTable* ctable_ = table_.table
     cdef shared_ptr[OutputStream] sink
+    cdef WriterProperties.Builder properties_builder
     cdef int64_t chunk_size_ = 0
     if chunk_size is None:
         chunk_size_ = min(ctable_.num_rows(), int(2**16))
     else:
         chunk_size_ = chunk_size
 
+    if version is not None:
+        if version == "1.0":
+            properties_builder.version(PARQUET_1_0)
+        elif version == "2.0":
+            properties_builder.version(PARQUET_2_0)
+        else:
+            raise ArrowException("Unsupported Parquet format version")
+
     sink.reset(new LocalFileOutputStream(tobytes(filename)))
-    check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink, chunk_size_))
+    with nogil:
+        check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink,
+            chunk_size_, properties_builder.build()))
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index d92cf4c..de9cfbb 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -42,18 +42,55 @@ def test_single_pylist_column_roundtrip(tmpdir):
             data_read = col_read.data.chunk(0)
             assert data_written.equals(data_read)
 
-def test_pandas_rountrip(tmpdir):
+def test_pandas_parquet_2_0_rountrip(tmpdir):
     size = 10000
+    np.random.seed(0)
     df = pd.DataFrame({
+        'uint8': np.arange(size, dtype=np.uint8),
+        'uint16': np.arange(size, dtype=np.uint16),
+        'uint32': np.arange(size, dtype=np.uint32),
+        'uint64': np.arange(size, dtype=np.uint64),
+        'int8': np.arange(size, dtype=np.int16),
+        'int16': np.arange(size, dtype=np.int16),
         'int32': np.arange(size, dtype=np.int32),
         'int64': np.arange(size, dtype=np.int64),
         'float32': np.arange(size, dtype=np.float32),
-        'float64': np.arange(size, dtype=np.float64)
+        'float64': np.arange(size, dtype=np.float64),
+        'bool': np.random.randn(size) > 0,
+        'str': [str(x) for x in range(size)],
+        'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None]
     })
     filename = tmpdir.join('pandas_rountrip.parquet')
     arrow_table = A.from_pandas_dataframe(df)
-    A.parquet.write_table(arrow_table, filename.strpath)
+    A.parquet.write_table(arrow_table, filename.strpath, version="2.0")
     table_read = pyarrow.parquet.read_table(filename.strpath)
     df_read = table_read.to_pandas()
     pdt.assert_frame_equal(df, df_read)
 
+def test_pandas_parquet_1_0_rountrip(tmpdir):
+    size = 10000
+    np.random.seed(0)
+    df = pd.DataFrame({
+        'uint8': np.arange(size, dtype=np.uint8),
+        'uint16': np.arange(size, dtype=np.uint16),
+        'uint32': np.arange(size, dtype=np.uint32),
+        'uint64': np.arange(size, dtype=np.uint64),
+        'int8': np.arange(size, dtype=np.int16),
+        'int16': np.arange(size, dtype=np.int16),
+        'int32': np.arange(size, dtype=np.int32),
+        'int64': np.arange(size, dtype=np.int64),
+        'float32': np.arange(size, dtype=np.float32),
+        'float64': np.arange(size, dtype=np.float64),
+        'bool': np.random.randn(size) > 0
+    })
+    filename = tmpdir.join('pandas_rountrip.parquet')
+    arrow_table = A.from_pandas_dataframe(df)
+    A.parquet.write_table(arrow_table, filename.strpath, version="1.0")
+    table_read = pyarrow.parquet.read_table(filename.strpath)
+    df_read = table_read.to_pandas()
+
+    # We pass uint32_t as int64_t if we write Parquet version 1.0
+    df['uint32'] = df['uint32'].values.astype(np.int64)
+
+    pdt.assert_frame_equal(df, df_read)
+