You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/05/16 17:14:26 UTC

arrow git commit: ARROW-881: [Python] Reconstruct Pandas DataFrame indexes using metadata

Repository: arrow
Updated Branches:
  refs/heads/master e7e8d611c -> bed019743


ARROW-881: [Python] Reconstruct Pandas DataFrame indexes using metadata

cc @mrocklin

Author: Phillip Cloud <cp...@gmail.com>

Closes #612 from cpcloud/ARROW-881 and squashes the following commits:

4fa679d [Phillip Cloud] Add metadata test
60f71aa [Phillip Cloud] More doc
de616e8 [Phillip Cloud] Add doc
a42a084 [Phillip Cloud] Decode metadata to utf8 because JSON
2198dc5 [Phillip Cloud] Call column_name_idx on index_columns
32c5e64 [Phillip Cloud] Add test for read_pandas subset
2fa1f16 [Phillip Cloud] Do not write index_column metadata if not requested
21a8829 [Phillip Cloud] Add docs to pq.read_pandas
c35970c [Phillip Cloud] Add test for no index written and pq.read_pandas
59477b5 [Phillip Cloud] ARROW-881: [Python] Reconstruct Pandas DataFrame indexes using custom_metadata


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/bed01974
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/bed01974
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/bed01974

Branch: refs/heads/master
Commit: bed01974321d9d1edeae9e474bd9df020b42ea10
Parents: e7e8d61
Author: Phillip Cloud <cp...@gmail.com>
Authored: Tue May 16 13:14:18 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue May 16 13:14:18 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/type-test.cc                  |  16 ++
 cpp/src/arrow/type.cc                       |  11 +-
 cpp/src/arrow/type.h                        |   7 +-
 python/pyarrow/__init__.py                  |   3 +-
 python/pyarrow/_parquet.pxd                 |   5 +
 python/pyarrow/_parquet.pyx                 |  61 +++---
 python/pyarrow/array.pxi                    |  92 +++++----
 python/pyarrow/includes/common.pxd          |   2 +-
 python/pyarrow/includes/libarrow.pxd        |   6 +-
 python/pyarrow/io.pxi                       |   2 +-
 python/pyarrow/ipc.py                       |  40 ++++
 python/pyarrow/lib.pxd                      |  32 ++--
 python/pyarrow/memory.pxi                   |   2 +-
 python/pyarrow/pandas_compat.py             | 104 +++++++++++
 python/pyarrow/parquet.py                   |  63 ++++++-
 python/pyarrow/table.pxi                    | 225 ++++++++++++++++-------
 python/pyarrow/tests/pandas_examples.py     |   8 +-
 python/pyarrow/tests/test_convert_pandas.py |  45 +++--
 python/pyarrow/tests/test_ipc.py            |  51 +++++
 python/pyarrow/tests/test_parquet.py        | 108 +++++++++--
 python/pyarrow/tests/test_table.py          |   2 +-
 21 files changed, 697 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/cpp/src/arrow/type-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type-test.cc b/cpp/src/arrow/type-test.cc
index e73adec..1fbb683 100644
--- a/cpp/src/arrow/type-test.cc
+++ b/cpp/src/arrow/type-test.cc
@@ -152,6 +152,22 @@ TEST_F(TestSchema, GetFieldByName) {
   ASSERT_TRUE(result == nullptr);
 }
 
+TEST_F(TestSchema, GetFieldIndex) {
+  auto f0 = field("f0", int32());
+  auto f1 = field("f1", uint8(), false);
+  auto f2 = field("f2", utf8());
+  auto f3 = field("f3", list(int16()));
+
+  vector<shared_ptr<Field>> fields = {f0, f1, f2, f3};
+  auto schema = std::make_shared<Schema>(fields);
+
+  ASSERT_EQ(0, schema->GetFieldIndex(fields[0]->name()));
+  ASSERT_EQ(1, schema->GetFieldIndex(fields[1]->name()));
+  ASSERT_EQ(2, schema->GetFieldIndex(fields[2]->name()));
+  ASSERT_EQ(3, schema->GetFieldIndex(fields[3]->name()));
+  ASSERT_EQ(-1, schema->GetFieldIndex("not-found"));
+}
+
 TEST_F(TestSchema, TestMetadataConstruction) {
   auto f0 = field("f0", int32());
   auto f1 = field("f1", uint8(), false);

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index afb3027..891045e 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -265,7 +265,12 @@ bool Schema::Equals(const Schema& other) const {
   return true;
 }
 
-std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) {
+std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) const {
+  int64_t i = GetFieldIndex(name);
+  return i == -1 ? nullptr : fields_[i];
+}
+
+int64_t Schema::GetFieldIndex(const std::string& name) const {
   if (fields_.size() > 0 && name_to_index_.size() == 0) {
     for (size_t i = 0; i < fields_.size(); ++i) {
       name_to_index_[fields_[i]->name()] = static_cast<int>(i);
@@ -274,9 +279,9 @@ std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) {
 
   auto it = name_to_index_.find(name);
   if (it == name_to_index_.end()) {
-    return nullptr;
+    return -1;
   } else {
-    return fields_[it->second];
+    return it->second;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 40615f7..3e85291 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -699,7 +699,10 @@ class ARROW_EXPORT Schema {
   std::shared_ptr<Field> field(int i) const { return fields_[i]; }
 
   // Returns nullptr if name not found
-  std::shared_ptr<Field> GetFieldByName(const std::string& name);
+  std::shared_ptr<Field> GetFieldByName(const std::string& name) const;
+
+  // Returns -1 if name not found
+  int64_t GetFieldIndex(const std::string& name) const;
 
   const std::vector<std::shared_ptr<Field>>& fields() const { return fields_; }
   std::shared_ptr<const KeyValueMetadata> metadata() const { return metadata_; }
@@ -720,7 +723,7 @@ class ARROW_EXPORT Schema {
 
  private:
   std::vector<std::shared_ptr<Field>> fields_;
-  std::unordered_map<std::string, int> name_to_index_;
+  mutable std::unordered_map<std::string, int> name_to_index_;
 
   std::shared_ptr<const KeyValueMetadata> metadata_;
 };

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 632a443..0f34121 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -104,7 +104,8 @@ from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
 from pyarrow.ipc import (RecordBatchFileReader, RecordBatchFileWriter,
                          RecordBatchStreamReader, RecordBatchStreamWriter,
                          open_stream,
-                         open_file)
+                         open_file,
+                         serialize_pandas, deserialize_pandas)
 
 
 localfs = LocalFilesystem.get_instance()

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/_parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 9f6edc0..2f6b9a9 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -20,6 +20,7 @@
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport (CArray, CSchema, CStatus,
                                         CTable, CMemoryPool,
+                                        CKeyValueMetadata,
                                         RandomAccessFile, OutputStream)
 
 
@@ -164,6 +165,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
 
         unique_ptr[CRowGroupMetaData] RowGroup(int i)
         const SchemaDescriptor* schema()
+        shared_ptr[const CKeyValueMetadata] key_value_metadata() const
 
     cdef cppclass ReaderProperties:
         pass
@@ -229,8 +231,11 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
 
 cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
     CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema,
+                              const shared_ptr[const CKeyValueMetadata]& key_value_metadata,
                               shared_ptr[CSchema]* out)
+
     CStatus ToParquetSchema(const CSchema* arrow_schema,
+                            const shared_ptr[const CKeyValueMetadata]& key_value_metadata,
                             shared_ptr[SchemaDescriptor]* out)
 
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 51bd938..77ef7ad 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -40,15 +40,15 @@ cdef class RowGroupMetaData:
     cdef:
         unique_ptr[CRowGroupMetaData] up_metadata
         CRowGroupMetaData* metadata
-        object parent
+        FileMetaData parent
 
     def __cinit__(self):
         pass
 
-    cdef init_from_file(self, FileMetaData parent, int i):
+    cdef void init_from_file(self, FileMetaData parent, int i):
         if i < 0 or i >= parent.num_row_groups:
             raise IndexError('{0} out of bounds'.format(i))
-        self.up_metadata = parent.metadata.RowGroup(i)
+        self.up_metadata = parent._metadata.RowGroup(i)
         self.metadata = self.up_metadata.get()
         self.parent = parent
 
@@ -80,15 +80,15 @@ cdef class RowGroupMetaData:
 cdef class FileMetaData:
     cdef:
         shared_ptr[CFileMetaData] sp_metadata
-        CFileMetaData* metadata
-        object _schema
+        CFileMetaData* _metadata
+        ParquetSchema _schema
 
     def __cinit__(self):
         pass
 
     cdef init(self, const shared_ptr[CFileMetaData]& metadata):
         self.sp_metadata = metadata
-        self.metadata = metadata.get()
+        self._metadata = metadata.get()
 
     def __repr__(self):
         return """{0}
@@ -116,27 +116,27 @@ cdef class FileMetaData:
     property serialized_size:
 
         def __get__(self):
-            return self.metadata.size()
+            return self._metadata.size()
 
     property num_columns:
 
         def __get__(self):
-            return self.metadata.num_columns()
+            return self._metadata.num_columns()
 
     property num_rows:
 
         def __get__(self):
-            return self.metadata.num_rows()
+            return self._metadata.num_rows()
 
     property num_row_groups:
 
         def __get__(self):
-            return self.metadata.num_row_groups()
+            return self._metadata.num_row_groups()
 
     property format_version:
 
         def __get__(self):
-            cdef ParquetVersion version = self.metadata.version()
+            cdef ParquetVersion version = self._metadata.version()
             if version == ParquetVersion_V1:
                 return '1.0'
             if version == ParquetVersion_V2:
@@ -149,7 +149,7 @@ cdef class FileMetaData:
     property created_by:
 
         def __get__(self):
-            return frombytes(self.metadata.created_by())
+            return frombytes(self._metadata.created_by())
 
     def row_group(self, int i):
         """
@@ -159,14 +159,26 @@ cdef class FileMetaData:
         result.init_from_file(self, i)
         return result
 
+    property metadata:
+
+        def __get__(self):
+            cdef:
+                unordered_map[c_string, c_string] metadata
+                const CKeyValueMetadata* underlying_metadata
+            underlying_metadata = self._metadata.key_value_metadata().get()
+            if underlying_metadata != NULL:
+                underlying_metadata.ToUnorderedMap(&metadata)
+                return metadata
+            else:
+                return None
+
 
 cdef class ParquetSchema:
     cdef:
-        object parent  # the FileMetaData owning the SchemaDescriptor
+        FileMetaData parent  # the FileMetaData owning the SchemaDescriptor
         const SchemaDescriptor* schema
 
     def __cinit__(self):
-        self.parent = None
         self.schema = NULL
 
     def __repr__(self):
@@ -186,7 +198,7 @@ cdef class ParquetSchema:
 
     cdef init_from_filemeta(self, FileMetaData container):
         self.parent = container
-        self.schema = container.metadata.schema()
+        self.schema = container._metadata.schema()
 
     def __len__(self):
         return self.schema.num_columns()
@@ -211,7 +223,9 @@ cdef class ParquetSchema:
             shared_ptr[CSchema] sp_arrow_schema
 
         with nogil:
-            check_status(FromParquetSchema(self.schema, &sp_arrow_schema))
+            check_status(FromParquetSchema(
+                self.schema, self.parent._metadata.key_value_metadata(),
+                &sp_arrow_schema))
 
         return pyarrow_wrap_schema(sp_arrow_schema)
 
@@ -232,7 +246,7 @@ cdef class ParquetSchema:
 
 cdef class ColumnSchema:
     cdef:
-        object parent
+        ParquetSchema parent
         const ColumnDescriptor* descr
 
     def __cinit__(self):
@@ -463,7 +477,7 @@ cdef class ParquetReader:
         """
         cdef:
             FileMetaData container = self.metadata
-            const CFileMetaData* metadata = container.metadata
+            const CFileMetaData* metadata = container._metadata
             int i = 0
 
         if self.column_idx_map is None:
@@ -488,12 +502,13 @@ cdef class ParquetReader:
         return array
 
 
-cdef check_compression_name(name):
+cdef int check_compression_name(name) except -1:
     if name.upper() not in ['NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI']:
         raise ArrowException("Unsupported compression: " + name)
+    return 0
 
 
-cdef ParquetCompression compression_from_name(object name):
+cdef ParquetCompression compression_from_name(str name):
     name = name.upper()
     if name == "SNAPPY":
         return ParquetCompression_SNAPPY
@@ -546,7 +561,7 @@ cdef class ParquetWriter:
                             maybe_unbox_memory_pool(memory_pool),
                             sink, properties, &self.writer))
 
-    cdef _set_version(self, WriterProperties.Builder* props):
+    cdef void _set_version(self, WriterProperties.Builder* props):
         if self.version is not None:
             if self.version == "1.0":
                 props.version(ParquetVersion_V1)
@@ -555,7 +570,7 @@ cdef class ParquetWriter:
             else:
                 raise ArrowException("Unsupported Parquet format version")
 
-    cdef _set_compression_props(self, WriterProperties.Builder* props):
+    cdef void _set_compression_props(self, WriterProperties.Builder* props):
         if isinstance(self.compression, basestring):
             check_compression_name(self.compression)
             props.compression(compression_from_name(self.compression))
@@ -564,7 +579,7 @@ cdef class ParquetWriter:
                 check_compression_name(codec)
                 props.compression(column, compression_from_name(codec))
 
-    cdef _set_dictionary_props(self, WriterProperties.Builder* props):
+    cdef void _set_dictionary_props(self, WriterProperties.Builder* props):
         if isinstance(self.use_dictionary, bool):
             if self.use_dictionary:
                 props.enable_dictionary()

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/array.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi
index c132269..5930de3 100644
--- a/python/pyarrow/array.pxi
+++ b/python/pyarrow/array.pxi
@@ -159,7 +159,7 @@ cdef class Field:
     def __cinit__(self):
         pass
 
-    cdef init(self, const shared_ptr[CField]& field):
+    cdef void init(self, const shared_ptr[CField]& field):
         self.sp_field = field
         self.field = field.get()
         self.type = pyarrow_wrap_data_type(field.get().type())
@@ -264,11 +264,11 @@ cdef class Schema:
 
         return result
 
-    cdef init(self, const vector[shared_ptr[CField]]& fields):
+    cdef void init(self, const vector[shared_ptr[CField]]& fields):
         self.schema = new CSchema(fields)
         self.sp_schema.reset(self.schema)
 
-    cdef init_schema(self, const shared_ptr[CSchema]& schema):
+    cdef void init_schema(self, const shared_ptr[CSchema]& schema):
         self.schema = schema.get()
         self.sp_schema = schema
 
@@ -310,6 +310,9 @@ cdef class Schema:
         """
         return pyarrow_wrap_field(self.schema.GetFieldByName(tobytes(name)))
 
+    def get_field_index(self, name):
+        return self.schema.GetFieldIndex(tobytes(name))
+
     def add_metadata(self, dict metadata):
         """
         Add metadata as dict of string keys and values to Schema
@@ -352,9 +355,9 @@ cdef class Schema:
         return self.__str__()
 
 
-cdef box_metadata(const CKeyValueMetadata* metadata):
+cdef dict box_metadata(const CKeyValueMetadata* metadata):
     cdef unordered_map[c_string, c_string] result
-    if metadata != NULL:
+    if metadata != nullptr:
         metadata.ToUnorderedMap(&result)
         return result
     else:
@@ -813,45 +816,60 @@ cdef class Date64Value(ArrayValue):
             ap.Value(self.index) / 1000).date()
 
 
+cdef dict DATETIME_CONVERSION_FUNCTIONS
+
+try:
+    import pandas as pd
+except ImportError:
+    DATETIME_CONVERSION_FUNCTIONS = {
+        TimeUnit_SECOND: lambda x, tzinfo: (
+            datetime.datetime.utcfromtimestamp(x).replace(tzinfo=tzinfo)
+        ),
+        TimeUnit_MILLI: lambda x, tzinfo: (
+            datetime.datetime.utcfromtimestamp(x / 1e3).replace(tzinfo=tzinfo)
+        ),
+        TimeUnit_MICRO: lambda x, tzinfo: (
+            datetime.datetime.utcfromtimestamp(x / 1e6).replace(tzinfo=tzinfo)
+        ),
+    }
+else:
+    DATETIME_CONVERSION_FUNCTIONS = {
+        TimeUnit_SECOND: lambda x, tzinfo: pd.Timestamp(
+            x * 1000000000, tz=tzinfo, unit='ns',
+        ),
+        TimeUnit_MILLI: lambda x, tzinfo: pd.Timestamp(
+            x * 1000000, tz=tzinfo, unit='ns',
+        ),
+        TimeUnit_MICRO: lambda x, tzinfo: pd.Timestamp(
+            x * 1000, tz=tzinfo, unit='ns',
+        ),
+        TimeUnit_NANO: lambda x, tzinfo: pd.Timestamp(
+            x, tz=tzinfo, unit='ns',
+        )
+    }
+
+
 cdef class TimestampValue(ArrayValue):
 
     def as_py(self):
         cdef:
             CTimestampArray* ap = <CTimestampArray*> self.sp_array.get()
-            CTimestampType* dtype = <CTimestampType*>ap.type().get()
-            int64_t val = ap.Value(self.index)
+            CTimestampType* dtype = <CTimestampType*> ap.type().get()
+            int64_t value = ap.Value(self.index)
 
-        timezone = None
-        tzinfo = None
-        if dtype.timezone().size() > 0:
-            timezone = frombytes(dtype.timezone())
+        if not dtype.timezone().empty():
             import pytz
-            tzinfo = pytz.timezone(timezone)
+            tzinfo = pytz.timezone(frombytes(dtype.timezone()))
+        else:
+            tzinfo = None
 
         try:
-            pd = _pandas()
-            if dtype.unit() == TimeUnit_SECOND:
-                val = val * 1000000000
-            elif dtype.unit() == TimeUnit_MILLI:
-                val = val * 1000000
-            elif dtype.unit() == TimeUnit_MICRO:
-                val = val * 1000
-            return pd.Timestamp(val, tz=tzinfo)
-        except ImportError:
-            if dtype.unit() == TimeUnit_SECOND:
-                result = datetime.datetime.utcfromtimestamp(val)
-            elif dtype.unit() == TimeUnit_MILLI:
-                result = datetime.datetime.utcfromtimestamp(float(val) / 1000)
-            elif dtype.unit() == TimeUnit_MICRO:
-                result = datetime.datetime.utcfromtimestamp(
-                    float(val) / 1000000)
-            else:
-                # TimeUnit_NANO
-                raise NotImplementedError("Cannot convert nanosecond "
-                                          "timestamps without pandas")
-            if timezone is not None:
-                result = result.replace(tzinfo=tzinfo)
-            return result
+            converter = DATETIME_CONVERSION_FUNCTIONS[dtype.unit()]
+        except KeyError:
+            raise ValueError(
+                'Cannot convert nanosecond timestamps without pandas'
+            )
+        return converter(value, tzinfo=tzinfo)
 
 
 cdef class FloatValue(ArrayValue):
@@ -1042,7 +1060,7 @@ def array(object sequence, DataType type=None, MemoryPool memory_pool=None):
 
 cdef class Array:
 
-    cdef init(self, const shared_ptr[CArray]& sp_array):
+    cdef void init(self, const shared_ptr[CArray]& sp_array):
         self.sp_array = sp_array
         self.ap = sp_array.get()
         self.type = pyarrow_wrap_data_type(self.sp_array.get().type())
@@ -1251,7 +1269,7 @@ cdef class Array:
 
 cdef class Tensor:
 
-    cdef init(self, const shared_ptr[CTensor]& sp_tensor):
+    cdef void init(self, const shared_ptr[CTensor]& sp_tensor):
         self.sp_tensor = sp_tensor
         self.tp = sp_tensor.get()
         self.type = pyarrow_wrap_data_type(self.tp.type())

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/includes/common.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd
index cc3b4b6..73bfb4f 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -18,7 +18,7 @@
 # distutils: language = c++
 
 from libc.stdint cimport *
-from libcpp cimport bool as c_bool
+from libcpp cimport bool as c_bool, nullptr
 from libcpp.memory cimport shared_ptr, unique_ptr, make_shared
 from libcpp.string cimport string as c_string
 from libcpp.vector cimport vector

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index a7e2733..9df31c8 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -29,6 +29,7 @@ cdef extern from "arrow/util/key_value_metadata.h" namespace "arrow" nogil:
         void Append(const c_string& key, const c_string& value)
         void ToUnorderedMap(unordered_map[c_string, c_string]*) const
 
+
 cdef extern from "arrow/api.h" namespace "arrow" nogil:
 
     enum Type" arrow::Type::type":
@@ -205,7 +206,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
 
         shared_ptr[CField] field(int i)
         shared_ptr[const CKeyValueMetadata] metadata()
-        shared_ptr[CField] GetFieldByName(c_string& name)
+        shared_ptr[CField] GetFieldByName(const c_string& name)
+        int64_t GetFieldIndex(const c_string& name)
         int num_fields()
         c_string ToString()
 
@@ -686,8 +688,10 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
     cdef cppclass PyBytesReader(CBufferReader):
         PyBytesReader(object fo)
 
+
 cdef extern from 'arrow/python/init.h':
     int arrow_init_numpy() except -1
 
+
 cdef extern from 'arrow/python/config.h' namespace 'arrow::py':
     void set_numpy_nan(object o)

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index a153f22..23eb6ef 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -462,7 +462,7 @@ cdef class Buffer:
     def __cinit__(self):
         pass
 
-    cdef init(self, const shared_ptr[CBuffer]& buffer):
+    cdef void init(self, const shared_ptr[CBuffer]& buffer):
         self.buffer = buffer
         self.shape[0] = self.size
         self.strides[0] = <Py_ssize_t>(1)

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index 8338de3..a61d746 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -17,6 +17,7 @@
 
 # Arrow file and stream reader/writer classes, and other messaging tools
 
+import pyarrow as pa
 import pyarrow.lib as lib
 
 
@@ -119,3 +120,42 @@ def open_file(source, footer_offset=None):
     reader : RecordBatchFileReader
     """
     return RecordBatchFileReader(source, footer_offset=footer_offset)
+
+
+def serialize_pandas(df):
+    """Serialize a pandas DataFrame into a buffer protocol compatible object.
+
+    Parameters
+    ----------
+    df : pandas.DataFrame
+
+    Returns
+    -------
+    buf : buffer
+        An object compatible with the buffer protocol
+    """
+    batch = pa.RecordBatch.from_pandas(df)
+    sink = pa.InMemoryOutputStream()
+    writer = pa.RecordBatchFileWriter(sink, batch.schema)
+    writer.write_batch(batch)
+    writer.close()
+    return sink.get_result()
+
+
+def deserialize_pandas(buf, nthreads=1):
+    """Deserialize a buffer protocol compatible object into a pandas DataFrame.
+
+    Parameters
+    ----------
+    buf : buffer
+        An object compatible with the buffer protocol
+    nthreads : int, optional
+        The number of threads to use to convert the buffer to a DataFrame.
+
+    Returns
+    -------
+    df : pandas.DataFrame
+    """
+    buffer_reader = pa.BufferReader(buf)
+    reader = pa.RecordBatchFileReader(buffer_reader)
+    return reader.read_all().to_pandas(nthreads=nthreads)

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/lib.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index d3d03aa..4a2ab86 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -33,7 +33,7 @@ cdef class MemoryPool:
     cdef:
         CMemoryPool* pool
 
-    cdef init(self, CMemoryPool* pool)
+    cdef void init(self, CMemoryPool* pool)
 
 
 cdef class LoggingMemoryPool(MemoryPool):
@@ -89,7 +89,7 @@ cdef class Field:
     cdef readonly:
         DataType type
 
-    cdef init(self, const shared_ptr[CField]& field)
+    cdef void init(self, const shared_ptr[CField]& field)
 
 
 cdef class Schema:
@@ -97,8 +97,8 @@ cdef class Schema:
         shared_ptr[CSchema] sp_schema
         CSchema* schema
 
-    cdef init(self, const vector[shared_ptr[CField]]& fields)
-    cdef init_schema(self, const shared_ptr[CSchema]& schema)
+    cdef void init(self, const vector[shared_ptr[CField]]& fields)
+    cdef void init_schema(self, const shared_ptr[CSchema]& schema)
 
 
 cdef class Scalar:
@@ -155,7 +155,7 @@ cdef class Array:
     cdef readonly:
         DataType type
 
-    cdef init(self, const shared_ptr[CArray]& sp_array)
+    cdef void init(self, const shared_ptr[CArray]& sp_array)
     cdef getitem(self, int64_t i)
 
 
@@ -167,7 +167,7 @@ cdef class Tensor:
     cdef readonly:
         DataType type
 
-    cdef init(self, const shared_ptr[CTensor]& sp_tensor)
+    cdef void init(self, const shared_ptr[CTensor]& sp_tensor)
 
 
 cdef class NullArray(Array):
@@ -266,8 +266,8 @@ cdef class ChunkedArray:
         shared_ptr[CChunkedArray] sp_chunked_array
         CChunkedArray* chunked_array
 
-    cdef init(self, const shared_ptr[CChunkedArray]& chunked_array)
-    cdef _check_nullptr(self)
+    cdef void init(self, const shared_ptr[CChunkedArray]& chunked_array)
+    cdef int _check_nullptr(self) except -1
 
 
 cdef class Column:
@@ -275,8 +275,8 @@ cdef class Column:
         shared_ptr[CColumn] sp_column
         CColumn* column
 
-    cdef init(self, const shared_ptr[CColumn]& column)
-    cdef _check_nullptr(self)
+    cdef void init(self, const shared_ptr[CColumn]& column)
+    cdef int _check_nullptr(self) except -1
 
 
 cdef class Table:
@@ -284,8 +284,8 @@ cdef class Table:
         shared_ptr[CTable] sp_table
         CTable* table
 
-    cdef init(self, const shared_ptr[CTable]& table)
-    cdef _check_nullptr(self)
+    cdef void init(self, const shared_ptr[CTable]& table)
+    cdef int _check_nullptr(self) except -1
 
 
 cdef class RecordBatch:
@@ -294,8 +294,8 @@ cdef class RecordBatch:
         CRecordBatch* batch
         Schema _schema
 
-    cdef init(self, const shared_ptr[CRecordBatch]& table)
-    cdef _check_nullptr(self)
+    cdef void init(self, const shared_ptr[CRecordBatch]& table)
+    cdef int _check_nullptr(self) except -1
 
 
 cdef class Buffer:
@@ -304,7 +304,7 @@ cdef class Buffer:
         Py_ssize_t shape[1]
         Py_ssize_t strides[1]
 
-    cdef init(self, const shared_ptr[CBuffer]& buffer)
+    cdef void init(self, const shared_ptr[CBuffer]& buffer)
 
 
 cdef class NativeFile:
@@ -335,3 +335,5 @@ cdef public object pyarrow_wrap_tensor(const shared_ptr[CTensor]& sp_tensor)
 cdef public object pyarrow_wrap_column(const shared_ptr[CColumn]& ccolumn)
 cdef public object pyarrow_wrap_table(const shared_ptr[CTable]& ctable)
 cdef public object pyarrow_wrap_batch(const shared_ptr[CRecordBatch]& cbatch)
+
+cdef dict box_metadata(const CKeyValueMetadata* sp_metadata)

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/memory.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/memory.pxi b/python/pyarrow/memory.pxi
index 15d59d2..6671a01 100644
--- a/python/pyarrow/memory.pxi
+++ b/python/pyarrow/memory.pxi
@@ -21,7 +21,7 @@
 
 
 cdef class MemoryPool:
-    cdef init(self, CMemoryPool* pool):
+    cdef void init(self, CMemoryPool* pool):
         self.pool = pool
 
     def bytes_allocated(self):

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/pandas_compat.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
new file mode 100644
index 0000000..2f72d6a
--- /dev/null
+++ b/python/pyarrow/pandas_compat.py
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import re
+import json
+import pandas as pd
+
+import six
+
+from pyarrow.compat import PY2
+
+
+INDEX_LEVEL_NAME_REGEX = re.compile(r'^__index_level_\d+__$')
+
+
+def is_unnamed_index_level(name):
+    return INDEX_LEVEL_NAME_REGEX.match(name) is not None
+
+
+def infer_dtype(column):
+    try:
+        return pd.api.types.infer_dtype(column)
+    except AttributeError:
+        return pd.lib.infer_dtype(column)
+
+
+def get_column_metadata(column, name):
+    inferred_dtype = infer_dtype(column)
+    dtype = column.dtype
+
+    if hasattr(dtype, 'categories'):
+        extra_metadata = {
+            'num_categories': len(column.cat.categories),
+            'ordered': column.cat.ordered,
+        }
+    elif hasattr(dtype, 'tz'):
+        extra_metadata = {'timezone': str(dtype.tz)}
+    else:
+        extra_metadata = None
+
+    if not isinstance(name, six.string_types):
+        raise TypeError(
+            'Column name must be a string. Got column {} of type {}'.format(
+                name, type(name).__name__
+            )
+        )
+
+    return {
+        'name': name,
+        'pandas_type': {
+            'string': 'bytes' if PY2 else 'unicode',
+            'datetime64': (
+                'datetimetz' if hasattr(dtype, 'tz')
+                else 'datetime'
+            ),
+            'integer': str(dtype),
+            'floating': str(dtype),
+        }.get(inferred_dtype, inferred_dtype),
+        'numpy_dtype': str(dtype),
+        'metadata': extra_metadata,
+    }
+
+
+def index_level_name(index, i):
+    return index.name or '__index_level_{:d}__'.format(i)
+
+
+def construct_metadata(df, index_levels, preserve_index):
+    return {
+        b'pandas': json.dumps(
+            {
+                'index_columns': [
+                    index_level_name(level, i)
+                    for i, level in enumerate(index_levels)
+                ] if preserve_index else [],
+                'columns': [
+                    get_column_metadata(df[name], name=name)
+                    for name in df.columns
+                ] + (
+                    [
+                        get_column_metadata(
+                            level, name=index_level_name(level, i)
+                        )
+                        for i, level in enumerate(index_levels)
+                    ] if preserve_index else []
+                ),
+                'pandas_version': pd.__version__,
+            }
+        ).encode('utf8')
+    }

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 050ec31..e69d85e 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -15,6 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import itertools
+import json
+
 import six
 
 import numpy as np
@@ -101,14 +104,31 @@ class ParquetFile(object):
         column_indices = self._get_column_indices(columns)
         if nthreads is not None:
             self.reader.set_num_threads(nthreads)
+
+        return self.reader.read_all(column_indices=column_indices)
+
+    def read_pandas(self, columns=None, nthreads=1):
+        column_indices = self._get_column_indices(columns)
+        custom_metadata = self.metadata.metadata
+
+        if custom_metadata and b'pandas' in custom_metadata:
+            index_columns = json.loads(
+                custom_metadata[b'pandas'].decode('utf8')
+            )['index_columns']
+        else:
+            index_columns = []
+
+        if column_indices is not None and index_columns:
+            column_indices += map(self.reader.column_name_idx, index_columns)
+
+        if nthreads is not None:
+            self.reader.set_num_threads(nthreads)
         return self.reader.read_all(column_indices=column_indices)
 
     def _get_column_indices(self, column_names):
         if column_names is None:
             return None
-        else:
-            return [self.reader.column_name_idx(column)
-                    for column in column_names]
+        return list(map(self.reader.column_name_idx, column_names))
 
 
 # ----------------------------------------------------------------------
@@ -618,6 +638,43 @@ def read_table(source, columns=None, nthreads=1, metadata=None):
     return pf.read(columns=columns, nthreads=nthreads)
 
 
+def read_pandas(source, columns=None, nthreads=1, metadata=None):
+    """
+    Read a Table from Parquet format, reconstructing the index values if
+    available.
+
+    Parameters
+    ----------
+    source: str or pyarrow.io.NativeFile
+        Location of Parquet dataset. If a string passed, can be a single file
+        name. For passing Python file objects or byte buffers,
+        see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader.
+    columns: list
+        If not None, only these columns will be read from the file.
+    nthreads : int, default 1
+        Number of columns to read in parallel. Requires that the underlying
+        file source is threadsafe
+    metadata : FileMetaData
+        If separately computed
+
+    Returns
+    -------
+    pyarrow.Table
+        Content of the file as a Table of Columns, including DataFrame indexes
+        as Columns.
+    """
+    if is_string(source):
+        fs = LocalFilesystem.get_instance()
+        if fs.isdir(source):
+            raise NotImplementedError(
+                'Reading a directory of Parquet files with DataFrame index '
+                'metadata is not yet supported'
+            )
+
+    pf = ParquetFile(source, metadata=metadata)
+    return pf.read_pandas(columns=columns, nthreads=nthreads)
+
+
 def write_table(table, where, row_group_size=None, version='1.0',
                 use_dictionary=True, compression='snappy', **kwargs):
     """

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/table.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index bd8cce4..c9915c1 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -15,8 +15,18 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import json
+
 from collections import OrderedDict
 
+try:
+    import pandas as pd
+except ImportError:
+    # The pure-Python based API works without a pandas installation
+    pass
+else:
+    import pyarrow.pandas_compat as pdcompat
+
 
 cdef class ChunkedArray:
     """
@@ -30,14 +40,18 @@ cdef class ChunkedArray:
     def __cinit__(self):
         self.chunked_array = NULL
 
-    cdef init(self, const shared_ptr[CChunkedArray]& chunked_array):
+    cdef void init(self, const shared_ptr[CChunkedArray]& chunked_array):
         self.sp_chunked_array = chunked_array
         self.chunked_array = chunked_array.get()
 
-    cdef _check_nullptr(self):
+    cdef int _check_nullptr(self) except -1:
         if self.chunked_array == NULL:
-            raise ReferenceError("ChunkedArray object references a NULL "
-                                 "pointer. Not initialized.")
+            raise ReferenceError(
+                "{} object references a NULL pointer. Not initialized.".format(
+                    type(self).__name__
+                )
+            )
+        return 0
 
     def length(self):
         self._check_nullptr()
@@ -111,7 +125,7 @@ cdef class Column:
     def __cinit__(self):
         self.column = NULL
 
-    cdef init(self, const shared_ptr[CColumn]& column):
+    cdef void init(self, const shared_ptr[CColumn]& column):
         self.sp_column = column
         self.column = column.get()
 
@@ -142,7 +156,7 @@ cdef class Column:
         check_status(libarrow.ConvertColumnToPandas(self.sp_column,
                                                     self, &out))
 
-        return _pandas().Series(wrap_array_output(out), name=self.name)
+        return pd.Series(wrap_array_output(out), name=self.name)
 
     def equals(self, Column other):
         """
@@ -175,14 +189,17 @@ cdef class Column:
         """
         return self.data.to_pylist()
 
-    cdef _check_nullptr(self):
+    cdef int _check_nullptr(self) except -1:
         if self.column == NULL:
-            raise ReferenceError("Column object references a NULL pointer."
-                    "Not initialized.")
+            raise ReferenceError(
+                "{} object references a NULL pointer. Not initialized.".format(
+                    type(self).__name__
+                )
+            )
+        return 0
 
     def __len__(self):
-        self._check_nullptr()
-        return self.column.length()
+        return self.length()
 
     def length(self):
         self._check_nullptr()
@@ -248,8 +265,9 @@ cdef class Column:
         return chunked_array
 
 
-cdef shared_ptr[const CKeyValueMetadata] key_value_metadata_from_dict(
-    dict metadata):
+cdef shared_ptr[const CKeyValueMetadata] unbox_metadata(dict metadata):
+    if metadata is None:
+        return <shared_ptr[const CKeyValueMetadata]> nullptr
     cdef:
         unordered_map[c_string, c_string] unordered_metadata = metadata
     return (<shared_ptr[const CKeyValueMetadata]>
@@ -289,27 +307,45 @@ cdef int _schema_from_arrays(
     else:
         raise TypeError(type(arrays[0]))
 
-    schema.reset(new CSchema(fields, key_value_metadata_from_dict(metadata)))
+    schema.reset(new CSchema(fields, unbox_metadata(metadata)))
     return 0
 
 
-cdef tuple _dataframe_to_arrays(df, bint timestamps_to_ms, Schema schema):
+cdef tuple _dataframe_to_arrays(
+    df,
+    bint timestamps_to_ms,
+    Schema schema,
+    bint preserve_index
+):
     cdef:
         list names = []
         list arrays = []
+        list index_levels = []
         DataType type = None
-        dict metadata = {}
+        dict metadata
+
+    if preserve_index:
+        index_levels.extend(getattr(df.index, 'levels', [df.index]))
 
     for name in df.columns:
         col = df[name]
         if schema is not None:
             type = schema.field_by_name(name).type
 
-        arr = Array.from_pandas(col, type=type,
-                                timestamps_to_ms=timestamps_to_ms)
+        arr = arrays.append(
+            Array.from_pandas(
+                col, type=type, timestamps_to_ms=timestamps_to_ms
+            )
+        )
         names.append(name)
-        arrays.append(arr)
 
+    for i, level in enumerate(index_levels):
+        arrays.append(
+            Array.from_pandas(level, timestamps_to_ms=timestamps_to_ms)
+        )
+        names.append(pdcompat.index_level_name(level, i))
+
+    metadata = pdcompat.construct_metadata(df, index_levels, preserve_index)
     return names, arrays, metadata
 
 
@@ -327,13 +363,18 @@ cdef class RecordBatch:
         self.batch = NULL
         self._schema = None
 
-    cdef init(self, const shared_ptr[CRecordBatch]& batch):
+    cdef void init(self, const shared_ptr[CRecordBatch]& batch):
         self.sp_batch = batch
         self.batch = batch.get()
 
-    cdef _check_nullptr(self):
+    cdef int _check_nullptr(self) except -1:
         if self.batch == NULL:
-            raise ReferenceError("Object not initialized")
+            raise ReferenceError(
+                "{} object references a NULL pointer. Not initialized.".format(
+                    type(self).__name__
+                )
+            )
+        return 0
 
     def __len__(self):
         self._check_nullptr()
@@ -455,22 +496,27 @@ cdef class RecordBatch:
         return Table.from_batches([self]).to_pandas(nthreads=nthreads)
 
     @classmethod
-    def from_pandas(cls, df, schema=None):
+    def from_pandas(cls, df, Schema schema=None, bint preserve_index=True):
         """
         Convert pandas.DataFrame to an Arrow RecordBatch
 
         Parameters
         ----------
         df: pandas.DataFrame
-        schema: pyarrow.Schema (optional)
+        schema: pyarrow.Schema, optional
             The expected schema of the RecordBatch. This can be used to
             indicate the type of columns if we cannot infer it automatically.
+        preserve_index : bool, optional
+            Whether to store the index as an additional column in the resulting
+            ``RecordBatch``.
 
         Returns
         -------
         pyarrow.RecordBatch
         """
-        names, arrays, metadata = _dataframe_to_arrays(df, False, schema)
+        names, arrays, metadata = _dataframe_to_arrays(
+            df, False, schema, preserve_index
+        )
         return cls.from_arrays(arrays, names, metadata)
 
     @staticmethod
@@ -503,7 +549,7 @@ cdef class RecordBatch:
             raise ValueError('Record batch cannot contain no arrays (for now)')
 
         num_rows = len(arrays[0])
-        _schema_from_arrays(arrays, names, metadata or {}, &schema)
+        _schema_from_arrays(arrays, names, metadata, &schema)
 
         c_arrays.reserve(len(arrays))
         for arr in arrays:
@@ -513,19 +559,55 @@ cdef class RecordBatch:
         return pyarrow_wrap_batch(batch)
 
 
-cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
-    cdef:
-        PyObject* result_obj
-        CColumn* col
-        int i
-
+cdef table_to_blockmanager(const shared_ptr[CTable]& ctable, int nthreads):
     import pandas.core.internals as _int
     from pandas import RangeIndex, Categorical
     from pyarrow.compat import DatetimeTZDtype
 
+    cdef:
+        Table table = pyarrow_wrap_table(ctable)
+        Table block_table = pyarrow_wrap_table(ctable)
+        Schema schema = table.schema
+
+        size_t row_count = table.num_rows
+        size_t total_columns = table.num_columns
+
+        dict metadata = schema.metadata
+        dict pandas_metadata = None
+
+        list index_columns = []
+        list index_arrays = []
+
+    if metadata is not None and b'pandas' in metadata:
+        pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
+        index_columns = pandas_metadata['index_columns']
+
+    cdef:
+        Column col
+        int64_t i
+
+    for name in index_columns:
+        i = schema.get_field_index(name)
+        if i != -1:
+            col = table.column(i)
+            index_name = None if pdcompat.is_unnamed_index_level(name) else name
+            index_arrays.append(
+                pd.Index(col.to_pandas().values, name=index_name)
+            )
+            block_table = block_table.remove_column(
+                block_table.schema.get_field_index(name)
+            )
+
+    cdef:
+        PyObject* result_obj
+        shared_ptr[CTable] c_block_table = block_table.sp_table
+
     with nogil:
-        check_status(libarrow.ConvertTableToPandas(table, nthreads,
-                                                   &result_obj))
+        check_status(
+            libarrow.ConvertTableToPandas(
+                c_block_table, nthreads, &result_obj
+            )
+        )
 
     result = PyObject_to_object(result_obj)
 
@@ -549,12 +631,13 @@ cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
             block = _int.make_block(block_arr, placement=placement)
         blocks.append(block)
 
-    names = []
-    for i in range(table.get().num_columns()):
-        col = table.get().column(i).get()
-        names.append(frombytes(col.name()))
+    cdef list axes = [
+        [column.name for column in block_table.itercolumns()],
+        pd.MultiIndex.from_arrays(
+            index_arrays
+        ) if index_arrays else pd.RangeIndex(row_count),
+    ]
 
-    axes = [names, RangeIndex(table.get().num_rows())]
     return _int.BlockManager(blocks, axes)
 
 
@@ -572,16 +655,18 @@ cdef class Table:
         self.table = NULL
 
     def __repr__(self):
-        return 'pyarrow.Table\n{0}'.format(str(self.schema))
+        return 'pyarrow.{}\n{}'.format(type(self).__name__, str(self.schema))
 
-    cdef init(self, const shared_ptr[CTable]& table):
+    cdef void init(self, const shared_ptr[CTable]& table):
         self.sp_table = table
         self.table = table.get()
 
-    cdef _check_nullptr(self):
-        if self.table == NULL:
-            raise ReferenceError("Table object references a NULL pointer."
-                    "Not initialized.")
+    cdef int _check_nullptr(self) except -1:
+        if self.table == nullptr:
+            raise ReferenceError(
+                "Table object references a NULL pointer. Not initialized."
+            )
+        return 0
 
     def equals(self, Table other):
         """
@@ -609,22 +694,29 @@ cdef class Table:
         return result
 
     @classmethod
-    def from_pandas(cls, df, timestamps_to_ms=False, schema=None):
+    def from_pandas(
+        cls,
+        df,
+        bint timestamps_to_ms=False,
+        Schema schema=None,
+        bint preserve_index=True
+    ):
         """
         Convert pandas.DataFrame to an Arrow Table
 
         Parameters
         ----------
-        df: pandas.DataFrame
-
-        timestamps_to_ms: bool
+        df : pandas.DataFrame
+        timestamps_to_ms : bool
             Convert datetime columns to ms resolution. This is needed for
             compability with other functionality like Parquet I/O which
             only supports milliseconds.
-
-        schema: pyarrow.Schema (optional)
+        schema : pyarrow.Schema, optional
             The expected schema of the Arrow Table. This can be used to
             indicate the type of columns if we cannot infer it automatically.
+        preserve_index : bool, optional
+            Whether to store the index as an additional column in the resulting
+            ``Table``.
 
         Returns
         -------
@@ -642,9 +734,12 @@ cdef class Table:
         >>> pa.Table.from_pandas(df)
         <pyarrow.lib.Table object at 0x7f05d1fb1b40>
         """
-        names, arrays, metadata = _dataframe_to_arrays(df,
-                                             timestamps_to_ms=timestamps_to_ms,
-                                             schema=schema)
+        names, arrays, metadata = _dataframe_to_arrays(
+            df,
+            timestamps_to_ms=timestamps_to_ms,
+            schema=schema,
+            preserve_index=preserve_index
+        )
         return cls.from_arrays(arrays, names=names, metadata=metadata)
 
     @staticmethod
@@ -671,7 +766,7 @@ cdef class Table:
             shared_ptr[CTable] table
             size_t K = len(arrays)
 
-        _schema_from_arrays(arrays, names, metadata or {}, &schema)
+        _schema_from_arrays(arrays, names, metadata, &schema)
 
         columns.reserve(K)
 
@@ -734,7 +829,7 @@ cdef class Table:
             nthreads = cpu_count()
 
         mgr = table_to_blockmanager(self.sp_table, nthreads)
-        return _pandas().DataFrame(mgr)
+        return pd.DataFrame(mgr)
 
     def to_pydict(self):
         """
@@ -744,11 +839,16 @@ cdef class Table:
         -------
         OrderedDict
         """
-        entries = []
-        for i in range(self.table.num_columns()):
-            name = self.column(i).name
-            column = self.column(i).to_pylist()
-            entries.append((name, column))
+        cdef:
+            size_t i
+            size_t num_columns = self.table.num_columns()
+            list entries = []
+            Column column
+
+        for i in range(num_columns):
+            column = self.column(i)
+            entries.append((column.name, column.to_pylist()))
+
         return OrderedDict(entries)
 
     @property
@@ -846,8 +946,7 @@ cdef class Table:
         """
         Add column to Table at position. Returns new table
         """
-        cdef:
-            shared_ptr[CTable] c_table
+        cdef shared_ptr[CTable] c_table
 
         with nogil:
             check_status(self.table.AddColumn(i, column.sp_column, &c_table))

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/tests/pandas_examples.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/pandas_examples.py b/python/pyarrow/tests/pandas_examples.py
index 313a3ae..17ad4b2 100644
--- a/python/pyarrow/tests/pandas_examples.py
+++ b/python/pyarrow/tests/pandas_examples.py
@@ -23,7 +23,7 @@ import pandas as pd
 import pyarrow as pa
 
 
-def dataframe_with_arrays():
+def dataframe_with_arrays(include_index=False):
     """
     Dataframe with numpy arrays columns of every possible primtive type.
 
@@ -72,13 +72,15 @@ def dataframe_with_arrays():
                  dtype='datetime64[ms]'),
     ]
 
+    if include_index:
+        fields.append(pa.field('__index_level_0__', pa.int64()))
     df = pd.DataFrame(arrays)
     schema = pa.schema(fields)
 
     return df, schema
 
 
-def dataframe_with_lists():
+def dataframe_with_lists(include_index=False):
     """
     Dataframe with list columns of every possible primtive type.
 
@@ -113,6 +115,8 @@ def dataframe_with_lists():
         [u"1", u"2", u"3"]
     ]
 
+    if include_index:
+        fields.append(pa.field('__index_level_0__', pa.int64()))
     df = pd.DataFrame(arrays)
     schema = pa.schema(fields)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index be35905..ca30455 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -67,9 +67,10 @@ class TestPandasConversion(unittest.TestCase):
 
     def _check_pandas_roundtrip(self, df, expected=None, nthreads=1,
                                 timestamps_to_ms=False, expected_schema=None,
-                                check_dtype=True, schema=None):
+                                check_dtype=True, schema=None,
+                                check_index=False):
         table = pa.Table.from_pandas(df, timestamps_to_ms=timestamps_to_ms,
-                                     schema=schema)
+                                     schema=schema, preserve_index=check_index)
         result = table.to_pandas(nthreads=nthreads)
         if expected_schema:
             assert table.schema.equals(expected_schema)
@@ -299,8 +300,11 @@ class TestPandasConversion(unittest.TestCase):
             })
         field = pa.field('datetime64', pa.timestamp('ms'))
         schema = pa.schema([field])
-        self._check_pandas_roundtrip(df, timestamps_to_ms=True,
-                                     expected_schema=schema)
+        self._check_pandas_roundtrip(
+            df,
+            timestamps_to_ms=True,
+            expected_schema=schema,
+        )
 
         df = pd.DataFrame({
             'datetime64': np.array([
@@ -311,8 +315,11 @@ class TestPandasConversion(unittest.TestCase):
             })
         field = pa.field('datetime64', pa.timestamp('ns'))
         schema = pa.schema([field])
-        self._check_pandas_roundtrip(df, timestamps_to_ms=False,
-                                     expected_schema=schema)
+        self._check_pandas_roundtrip(
+            df,
+            timestamps_to_ms=False,
+            expected_schema=schema,
+        )
 
     def test_timestamps_notimezone_nulls(self):
         df = pd.DataFrame({
@@ -324,8 +331,11 @@ class TestPandasConversion(unittest.TestCase):
             })
         field = pa.field('datetime64', pa.timestamp('ms'))
         schema = pa.schema([field])
-        self._check_pandas_roundtrip(df, timestamps_to_ms=True,
-                                     expected_schema=schema)
+        self._check_pandas_roundtrip(
+            df,
+            timestamps_to_ms=True,
+            expected_schema=schema,
+        )
 
         df = pd.DataFrame({
             'datetime64': np.array([
@@ -336,8 +346,11 @@ class TestPandasConversion(unittest.TestCase):
             })
         field = pa.field('datetime64', pa.timestamp('ns'))
         schema = pa.schema([field])
-        self._check_pandas_roundtrip(df, timestamps_to_ms=False,
-                                     expected_schema=schema)
+        self._check_pandas_roundtrip(
+            df,
+            timestamps_to_ms=False,
+            expected_schema=schema,
+        )
 
     def test_timestamps_with_timezone(self):
         df = pd.DataFrame({
@@ -370,7 +383,7 @@ class TestPandasConversion(unittest.TestCase):
                      None,
                      datetime.date(1970, 1, 1),
                      datetime.date(2040, 2, 26)]})
-        table = pa.Table.from_pandas(df)
+        table = pa.Table.from_pandas(df, preserve_index=False)
         field = pa.field('date', pa.date32())
         schema = pa.schema([field])
         assert table.schema.equals(schema)
@@ -446,7 +459,7 @@ class TestPandasConversion(unittest.TestCase):
     def test_column_of_arrays(self):
         df, schema = dataframe_with_arrays()
         self._check_pandas_roundtrip(df, schema=schema, expected_schema=schema)
-        table = pa.Table.from_pandas(df, schema=schema)
+        table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
         assert table.schema.equals(schema)
 
         for column in df.columns:
@@ -456,7 +469,7 @@ class TestPandasConversion(unittest.TestCase):
     def test_column_of_lists(self):
         df, schema = dataframe_with_lists()
         self._check_pandas_roundtrip(df, schema=schema, expected_schema=schema)
-        table = pa.Table.from_pandas(df, schema=schema)
+        table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
         assert table.schema.equals(schema)
 
         for column in df.columns:
@@ -543,7 +556,7 @@ class TestPandasConversion(unittest.TestCase):
                 decimal.Decimal('1234.439'),
             ]
         })
-        converted = pa.Table.from_pandas(expected)
+        converted = pa.Table.from_pandas(expected, preserve_index=False)
         field = pa.field('decimals', pa.decimal(7, 3))
         schema = pa.schema([field])
         assert converted.schema.equals(schema)
@@ -566,7 +579,7 @@ class TestPandasConversion(unittest.TestCase):
                 decimal.Decimal('129534.123731'),
             ]
         })
-        converted = pa.Table.from_pandas(expected)
+        converted = pa.Table.from_pandas(expected, preserve_index=False)
         field = pa.field('decimals', pa.decimal(12, 6))
         schema = pa.schema([field])
         assert converted.schema.equals(schema)
@@ -589,7 +602,7 @@ class TestPandasConversion(unittest.TestCase):
                 -decimal.Decimal('314292388910493.12343437128'),
             ]
         })
-        converted = pa.Table.from_pandas(expected)
+        converted = pa.Table.from_pandas(expected, preserve_index=False)
         field = pa.field('decimals', pa.decimal(26, 11))
         schema = pa.schema([field])
         assert converted.schema.equals(schema)

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 994876d..eeea39a 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -240,6 +240,57 @@ def test_get_record_batch_size():
     assert pa.get_record_batch_size(batch) > (N * itemsize)
 
 
+def test_pandas_serialize_round_trip():
+    index = pd.Index([1, 2, 3], name='my_index')
+    columns = ['foo', 'bar']
+    df = pd.DataFrame(
+        {'foo': [1.5, 1.6, 1.7], 'bar': list('abc')},
+        index=index, columns=columns
+    )
+    buf = pa.serialize_pandas(df)
+    result = pa.deserialize_pandas(buf)
+    assert_frame_equal(result, df)
+
+
+def test_pandas_serialize_round_trip_nthreads():
+    index = pd.Index([1, 2, 3], name='my_index')
+    columns = ['foo', 'bar']
+    df = pd.DataFrame(
+        {'foo': [1.5, 1.6, 1.7], 'bar': list('abc')},
+        index=index, columns=columns
+    )
+    buf = pa.serialize_pandas(df)
+    result = pa.deserialize_pandas(buf, nthreads=2)
+    assert_frame_equal(result, df)
+
+
+def test_pandas_serialize_round_trip_multi_index():
+    index1 = pd.Index([1, 2, 3], name='level_1')
+    index2 = pd.Index(list('def'), name=None)
+    index = pd.MultiIndex.from_arrays([index1, index2])
+
+    columns = ['foo', 'bar']
+    df = pd.DataFrame(
+        {'foo': [1.5, 1.6, 1.7], 'bar': list('abc')},
+        index=index,
+        columns=columns,
+    )
+    buf = pa.serialize_pandas(df)
+    result = pa.deserialize_pandas(buf)
+    assert_frame_equal(result, df)
+
+
+@pytest.mark.xfail(
+    raises=TypeError,
+    reason='Non string columns are not supported',
+)
+def test_pandas_serialize_round_trip_not_string_columns():
+    df = pd.DataFrame(list(zip([1.5, 1.6, 1.7], 'abc')))
+    buf = pa.serialize_pandas(df)
+    result = pa.deserialize_pandas(buf)
+    assert_frame_equal(result, df)
+
+
 def write_file(batch, sink):
     writer = pa.RecordBatchFileWriter(sink, batch.schema)
     writer.write_batch(batch)

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 5dbe657..db446d3 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -19,6 +19,7 @@ from os.path import join as pjoin
 import datetime
 import io
 import os
+import json
 import pytest
 
 from pyarrow.compat import guid, u
@@ -31,15 +32,11 @@ import pandas as pd
 
 import pandas.util.testing as tm
 
-try:
-    import pyarrow.parquet as pq
-    HAVE_PARQUET = True
-except ImportError:
-    HAVE_PARQUET = False
+# Skip all parquet tests if we can't import pyarrow.parquet
+pq = pytest.importorskip('pyarrow.parquet')
 
-# XXX: Make Parquet tests opt-in rather than skip-if-not-build
-parquet = pytest.mark.skipif(not HAVE_PARQUET,
-                             reason='Parquet support not built')
+# Ignore these with pytest ... -m 'not parquet'
+parquet = pytest.mark.parquet
 
 
 @parquet
@@ -91,8 +88,55 @@ def test_pandas_parquet_2_0_rountrip(tmpdir):
 
     filename = tmpdir.join('pandas_rountrip.parquet')
     arrow_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
+    assert b'pandas' in arrow_table.schema.metadata
+
     pq.write_table(arrow_table, filename.strpath, version="2.0")
-    table_read = pq.read_table(filename.strpath)
+    table_read = pq.read_pandas(filename.strpath)
+    assert b'pandas' in table_read.schema.metadata
+
+    assert arrow_table.schema.metadata == table_read.schema.metadata
+
+    df_read = table_read.to_pandas()
+    tm.assert_frame_equal(df, df_read)
+
+
+@parquet
+def test_pandas_parquet_custom_metadata(tmpdir):
+    df = alltypes_sample(size=10000)
+
+    filename = tmpdir.join('pandas_rountrip.parquet')
+    arrow_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
+    assert b'pandas' in arrow_table.schema.metadata
+
+    pq.write_table(arrow_table, filename.strpath, version="2.0")
+    pf = pq.ParquetFile(filename.strpath)
+
+    md = pf.metadata.metadata
+    assert b'pandas' in md
+
+    js = json.loads(md[b'pandas'].decode('utf8'))
+    assert js['index_columns'] == ['__index_level_0__']
+
+
+@parquet
+def test_pandas_parquet_2_0_rountrip_read_pandas_no_index_written(tmpdir):
+    df = alltypes_sample(size=10000)
+
+    filename = tmpdir.join('pandas_rountrip.parquet')
+    arrow_table = pa.Table.from_pandas(
+        df, timestamps_to_ms=True, preserve_index=False
+    )
+    js = json.loads(arrow_table.schema.metadata[b'pandas'].decode('utf8'))
+    assert not js['index_columns']
+
+    pq.write_table(arrow_table, filename.strpath, version="2.0")
+    table_read = pq.read_pandas(filename.strpath)
+
+    js = json.loads(table_read.schema.metadata[b'pandas'].decode('utf8'))
+    assert not js['index_columns']
+
+    assert arrow_table.schema.metadata == table_read.schema.metadata
+
     df_read = table_read.to_pandas()
     tm.assert_frame_equal(df, df_read)
 
@@ -167,7 +211,6 @@ def _test_dataframe(size=10000, seed=0):
         'int32': _random_integers(size, np.int32),
         'int64': _random_integers(size, np.int64),
         'float32': np.random.randn(size).astype(np.float32),
-        'float64': np.random.randn(size),
         'float64': np.arange(size, dtype=np.float64),
         'bool': np.random.randn(size) > 0,
         'strings': [tm.rands(10) for i in range(size)]
@@ -188,6 +231,18 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir):
 
 
 @parquet
+def test_read_pandas_column_subset(tmpdir):
+    df = _test_dataframe(10000)
+    arrow_table = pa.Table.from_pandas(df)
+    imos = pa.BufferOutputStream()
+    pq.write_table(arrow_table, imos, version="2.0")
+    buf = imos.get_result()
+    reader = pa.BufferReader(buf)
+    df_read = pq.read_pandas(reader, columns=['strings', 'uint8']).to_pandas()
+    tm.assert_frame_equal(df[['strings', 'uint8']], df_read)
+
+
+@parquet
 def test_pandas_parquet_pyfile_roundtrip(tmpdir):
     filename = tmpdir.join('pandas_pyfile_roundtrip.parquet').strpath
     size = 5
@@ -270,7 +325,7 @@ def test_parquet_metadata_api():
     meta = fileh.metadata
     repr(meta)
     assert meta.num_rows == len(df)
-    assert meta.num_columns == ncols
+    assert meta.num_columns == ncols + 1  # +1 for index
     assert meta.num_row_groups == 1
     assert meta.format_version == '2.0'
     assert 'parquet-cpp' in meta.created_by
@@ -278,7 +333,7 @@ def test_parquet_metadata_api():
     # Schema
     schema = fileh.schema
     assert meta.schema is schema
-    assert len(schema) == ncols
+    assert len(schema) == ncols + 1  # +1 for index
     repr(schema)
 
     col = schema[0]
@@ -292,7 +347,7 @@ def test_parquet_metadata_api():
     assert col.logical_type == 'NONE'
 
     with pytest.raises(IndexError):
-        schema[ncols]
+        schema[ncols + 1]  # +1 for index
 
     with pytest.raises(IndexError):
         schema[-1]
@@ -302,7 +357,7 @@ def test_parquet_metadata_api():
     repr(rg_meta)
 
     assert rg_meta.num_rows == len(df)
-    assert rg_meta.num_columns == ncols
+    assert rg_meta.num_columns == ncols + 1  # +1 for index
 
 
 @parquet
@@ -502,9 +557,22 @@ def test_read_single_row_group():
     result = pa.concat_tables(row_groups)
     tm.assert_frame_equal(df, result.to_pandas())
 
+
+@parquet
+def test_read_single_row_group_with_column_subset():
+    N, K = 10000, 4
+    df = alltypes_sample(size=N)
+    a_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
+
+    buf = io.BytesIO()
+    pq.write_table(a_table, buf, row_group_size=N / K,
+                   compression='snappy', version='2.0')
+
+    buf.seek(0)
+    pf = pq.ParquetFile(buf)
+
     cols = df.columns[:2]
-    row_groups = [pf.read_row_group(i, columns=cols)
-                  for i in range(K)]
+    row_groups = [pf.read_row_group(i, columns=cols) for i in range(K)]
     result = pa.concat_tables(row_groups)
     tm.assert_frame_equal(df[cols], result.to_pandas())
 
@@ -696,6 +764,9 @@ def test_read_multiple_files(tmpdir):
 
     assert result.equals(expected)
 
+    with pytest.raises(NotImplementedError):
+        pq.read_pandas(dirpath)
+
     # Read with provided metadata
     metadata = pq.ParquetFile(paths[0]).metadata
 
@@ -706,10 +777,11 @@ def test_read_multiple_files(tmpdir):
     assert result3.equals(expected)
 
     # Read column subset
-    to_read = [result[0], result[3], result[6]]
+    to_read = [result[0], result[2], result[6], result[result.num_columns - 1]]
+
     result = pa.localfs.read_parquet(
         dirpath, columns=[c.name for c in to_read])
-    expected = pa.Table.from_arrays(to_read)
+    expected = pa.Table.from_arrays(to_read, metadata=result.schema.metadata)
     assert result.equals(expected)
 
     # Read with multiple threads

http://git-wip-us.apache.org/repos/asf/arrow/blob/bed01974/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index 72ce696..ed22011 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -142,7 +142,7 @@ def test_recordbatchlist_to_pandas():
 
     table = pa.Table.from_batches([batch1, batch2])
     result = table.to_pandas()
-    data = pd.concat([data1, data2], ignore_index=True)
+    data = pd.concat([data1, data2])
     assert_frame_equal(data, result)