You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/23 11:41:43 UTC

arrow git commit: ARROW-475: [Python] Add support for reading multiple Parquet files as a single pyarrow.Table

Repository: arrow
Updated Branches:
  refs/heads/master 5a161ebc1 -> 53a478dfb


ARROW-475: [Python] Add support for reading multiple Parquet files as a single pyarrow.Table

Also fixes a serious bug in which the data source passed to the ParquetReader gets garbage collected prematurely

Also implements ARROW-470

Author: Wes McKinney <we...@twosigma.com>

Closes #296 from wesm/ARROW-475 and squashes the following commits:

894d2a2 [Wes McKinney] Implement Filesystem abstraction, add Filesystem.read_parquet. Implement rudimentary shim on local filesystem
3927c2c [Wes McKinney] Test read multiple Parquet from HDFS, fix premature garbage collection error
4904b3b [Wes McKinney] Implement read_multiple_files function for multiple Parquet files as a single Arrow table


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/53a478df
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/53a478df
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/53a478df

Branch: refs/heads/master
Commit: 53a478dfb278dcae5ca7f300b70857662553d118
Parents: 5a161eb
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Jan 23 06:41:35 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Jan 23 06:41:35 2017 -0500

----------------------------------------------------------------------
 python/pyarrow/__init__.py                   |   6 +-
 python/pyarrow/_parquet.pyx                  |   3 +
 python/pyarrow/filesystem.py                 | 186 ++++++++++++++++++++++
 python/pyarrow/includes/libarrow_io.pxd      |   2 +
 python/pyarrow/io.pyx                        |  62 +++-----
 python/pyarrow/parquet.py                    |  88 ++++++++--
 python/pyarrow/table.pyx                     |  60 ++++---
 python/pyarrow/tests/test_column.py          |  49 ------
 python/pyarrow/tests/test_convert_builtin.py |   3 +-
 python/pyarrow/tests/test_convert_pandas.py  |   8 +-
 python/pyarrow/tests/test_hdfs.py            |  46 +++++-
 python/pyarrow/tests/test_parquet.py         | 155 +++++++++++++-----
 python/pyarrow/tests/test_scalars.py         |   2 +-
 python/pyarrow/tests/test_schema.py          |   1 -
 python/pyarrow/tests/test_table.py           |  50 ++++--
 python/pyarrow/util.py                       |  25 +++
 16 files changed, 568 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index efffbf2..d563c7a 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -42,7 +42,8 @@ from pyarrow.array import (Array,
 
 from pyarrow.error import ArrowException
 
-from pyarrow.io import (HdfsClient, HdfsFile, NativeFile, PythonFileInterface,
+from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
+from pyarrow.io import (HdfsFile, NativeFile, PythonFileInterface,
                         Buffer, InMemoryOutputStream, BufferReader)
 
 from pyarrow.scalar import (ArrayValue, Scalar, NA, NAType,
@@ -61,3 +62,6 @@ from pyarrow.schema import (null, bool_,
                             DataType, Field, Schema, schema)
 
 from pyarrow.table import Column, RecordBatch, Table, concat_tables
+
+
+localfs = LocalFilesystem()

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 867fc4c..b11cee3 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -341,6 +341,7 @@ cdef logical_type_name_from_enum(ParquetLogicalType type_):
 
 cdef class ParquetReader:
     cdef:
+        object source
         MemoryPool* allocator
         unique_ptr[FileReader] reader
         column_idx_map
@@ -360,6 +361,8 @@ cdef class ParquetReader:
         if metadata is not None:
             c_metadata = metadata.sp_metadata
 
+        self.source = source
+
         get_reader(source, &rd_handle)
         with nogil:
             check_status(OpenFile(rd_handle, self.allocator, properties,

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/filesystem.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py
new file mode 100644
index 0000000..82409b7
--- /dev/null
+++ b/python/pyarrow/filesystem.py
@@ -0,0 +1,186 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from os.path import join as pjoin
+import os
+
+from pyarrow.util import implements
+import pyarrow.io as io
+
+
+class Filesystem(object):
+    """
+    Abstract filesystem interface
+    """
+    def ls(self, path):
+        """
+        Return list of file paths
+        """
+        raise NotImplementedError
+
+    def delete(self, path, recursive=False):
+        """
+        Delete the indicated file or directory
+
+        Parameters
+        ----------
+        path : string
+        recursive : boolean, default False
+            If True, also delete child paths for directories
+        """
+        raise NotImplementedError
+
+    def mkdir(self, path, create_parents=True):
+        raise NotImplementedError
+
+    def exists(self, path):
+        raise NotImplementedError
+
+    def isdir(self, path):
+        """
+        Return True if path is a directory
+        """
+        raise NotImplementedError
+
+    def isfile(self, path):
+        """
+        Return True if path is a file
+        """
+        raise NotImplementedError
+
+    def read_parquet(self, path, columns=None, schema=None):
+        """
+        Read Parquet data from path in file system. Can read from a single file
+        or a directory of files
+
+        Parameters
+        ----------
+        path : str
+            Single file path or directory
+        columns : List[str], optional
+            Subset of columns to read
+        schema : pyarrow.parquet.Schema
+            Known schema to validate files against
+
+        Returns
+        -------
+        table : pyarrow.Table
+        """
+        from pyarrow.parquet import read_multiple_files
+
+        if self.isdir(path):
+            paths_to_read = []
+            for path in self.ls(path):
+                if path == '_metadata' or path == '_common_metadata':
+                    raise ValueError('No support yet for common metadata file')
+                paths_to_read.append(path)
+        else:
+            paths_to_read = [path]
+
+        return read_multiple_files(paths_to_read, columns=columns,
+                                   filesystem=self, schema=schema)
+
+
+class LocalFilesystem(Filesystem):
+
+    @implements(Filesystem.ls)
+    def ls(self, path):
+        return sorted(pjoin(path, x) for x in os.listdir(path))
+
+    @implements(Filesystem.isdir)
+    def isdir(self, path):
+        return os.path.isdir(path)
+
+    @implements(Filesystem.isfile)
+    def isfile(self, path):
+        return os.path.isfile(path)
+
+    @implements(Filesystem.exists)
+    def exists(self, path):
+        return os.path.exists(path)
+
+    def open(self, path, mode='rb'):
+        """
+        Open file for reading or writing
+        """
+        return open(path, mode=mode)
+
+
+class HdfsClient(io._HdfsClient, Filesystem):
+    """
+    Connect to an HDFS cluster. All parameters are optional and should
+    only be set if the defaults need to be overridden.
+
+    Authentication should be automatic if the HDFS cluster uses Kerberos.
+    However, if a username is specified, then the ticket cache will likely
+    be required.
+
+    Parameters
+    ----------
+    host : NameNode. Set to "default" for fs.defaultFS from core-site.xml.
+    port : NameNode's port. Set to 0 for default or logical (HA) nodes.
+    user : Username when connecting to HDFS; None implies login user.
+    kerb_ticket : Path to Kerberos ticket cache.
+    driver : {'libhdfs', 'libhdfs3'}, default 'libhdfs'
+      Connect using libhdfs (JNI-based) or libhdfs3 (3rd-party C++
+      library from Pivotal Labs)
+
+    Notes
+    -----
+    The first time you call this method, it will take longer than usual due
+    to JNI spin-up time.
+
+    Returns
+    -------
+    client : HDFSClient
+    """
+
+    def __init__(self, host="default", port=0, user=None, kerb_ticket=None,
+                 driver='libhdfs'):
+        self._connect(host, port, user, kerb_ticket, driver)
+
+    @implements(Filesystem.isdir)
+    def isdir(self, path):
+        return io._HdfsClient.isdir(self, path)
+
+    @implements(Filesystem.isfile)
+    def isfile(self, path):
+        return io._HdfsClient.isfile(self, path)
+
+    @implements(Filesystem.delete)
+    def delete(self, path, recursive=False):
+        return io._HdfsClient.delete(self, path, recursive)
+
+    @implements(Filesystem.mkdir)
+    def mkdir(self, path, create_parents=True):
+        return io._HdfsClient.mkdir(self, path)
+
+    def ls(self, path, full_info=False):
+        """
+        Retrieve directory contents and metadata, if requested.
+
+        Parameters
+        ----------
+        path : HDFS path
+        full_info : boolean, default False
+            If False, only return list of paths
+
+        Returns
+        -------
+        result : list of dicts (full_info=True) or strings (full_info=False)
+        """
+        return io._HdfsClient.ls(self, path, full_info)

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/includes/libarrow_io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd
index 417af7d..3137938 100644
--- a/python/pyarrow/includes/libarrow_io.pxd
+++ b/python/pyarrow/includes/libarrow_io.pxd
@@ -148,6 +148,8 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
         CStatus ListDirectory(const c_string& path,
                               vector[HdfsPathInfo]* listing)
 
+        CStatus GetPathInfo(const c_string& path, HdfsPathInfo* info)
+
         CStatus Rename(const c_string& src, const c_string& dst)
 
         CStatus OpenReadable(const c_string& path,

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 0f626f1..2621512 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -463,42 +463,17 @@ def strip_hdfs_abspath(path):
         return path
 
 
-cdef class HdfsClient:
+cdef class _HdfsClient:
     cdef:
         shared_ptr[CHdfsClient] client
 
     cdef readonly:
         bint is_open
 
-    def __cinit__(self, host="default", port=0, user=None, kerb_ticket=None,
-                  driver='libhdfs'):
-        """
-        Connect to an HDFS cluster. All parameters are optional and should
-        only be set if the defaults need to be overridden.
-
-        Authentication should be automatic if the HDFS cluster uses Kerberos.
-        However, if a username is specified, then the ticket cache will likely
-        be required.
+    def __cinit__(self):
+        pass
 
-        Parameters
-        ----------
-        host : NameNode. Set to "default" for fs.defaultFS from core-site.xml.
-        port : NameNode's port. Set to 0 for default or logical (HA) nodes.
-        user : Username when connecting to HDFS; None implies login user.
-        kerb_ticket : Path to Kerberos ticket cache.
-        driver : {'libhdfs', 'libhdfs3'}, default 'libhdfs'
-          Connect using libhdfs (JNI-based) or libhdfs3 (3rd-party C++
-          library from Pivotal Labs)
-
-        Notes
-        -----
-        The first time you call this method, it will take longer than usual due
-        to JNI spin-up time.
-
-        Returns
-        -------
-        client : HDFSClient
-        """
+    def _connect(self, host, port, user, kerb_ticket, driver):
         cdef HdfsConnectionConfig conf
 
         if host is not None:
@@ -556,20 +531,25 @@ cdef class HdfsClient:
             result = self.client.get().Exists(c_path)
         return result
 
-    def ls(self, path, bint full_info=True):
-        """
-        Retrieve directory contents and metadata, if requested.
+    def isdir(self, path):
+        cdef HdfsPathInfo info
+        self._path_info(path, &info)
+        return info.kind == ObjectType_DIRECTORY
 
-        Parameters
-        ----------
-        path : HDFS path
-        full_info : boolean, default True
-            If False, only return list of paths
+    def isfile(self, path):
+        cdef HdfsPathInfo info
+        self._path_info(path, &info)
+        return info.kind == ObjectType_FILE
 
-        Returns
-        -------
-        result : list of dicts (full_info=True) or strings (full_info=False)
-        """
+    cdef _path_info(self, path, HdfsPathInfo* info):
+        cdef c_string c_path = tobytes(path)
+
+        with nogil:
+            check_status(self.client.get()
+                         .GetPathInfo(c_path, info))
+
+
+    def ls(self, path, bint full_info):
         cdef:
             c_string c_path = tobytes(path)
             vector[HdfsPathInfo] listing

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 2a1ac9d..cbe1c6e 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -15,8 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import pyarrow._parquet as _parquet
-from pyarrow.table import Table
+from pyarrow._parquet import (ParquetReader, FileMetaData,  # noqa
+                              RowGroupMetaData, Schema, ParquetWriter)
+import pyarrow._parquet as _parquet  # noqa
+from pyarrow.table import Table, concat_tables
 
 
 class ParquetFile(object):
@@ -32,7 +34,7 @@ class ParquetFile(object):
         Use existing metadata object, rather than reading from file.
     """
     def __init__(self, source, metadata=None):
-        self.reader = _parquet.ParquetReader()
+        self.reader = ParquetReader()
         self.reader.open(source, metadata=metadata)
 
     @property
@@ -67,10 +69,10 @@ class ParquetFile(object):
                            for column in columns]
             arrays = [self.reader.read_column(column_idx)
                       for column_idx in column_idxs]
-            return Table.from_arrays(columns, arrays)
+            return Table.from_arrays(arrays, names=columns)
 
 
-def read_table(source, columns=None):
+def read_table(source, columns=None, metadata=None):
     """
     Read a Table from Parquet format
 
@@ -81,17 +83,79 @@ def read_table(source, columns=None):
         pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader.
     columns: list
         If not None, only these columns will be read from the file.
+    metadata : FileMetaData
+        If separately computed
 
     Returns
     -------
-    pyarrow.table.Table
+    pyarrow.Table
         Content of the file as a table (of columns)
     """
-    return ParquetFile(source).read(columns=columns)
+    return ParquetFile(source, metadata=metadata).read(columns=columns)
 
 
-def write_table(table, sink, chunk_size=None, version=None,
-                use_dictionary=True, compression=None):
+def read_multiple_files(paths, columns=None, filesystem=None, metadata=None,
+                        schema=None):
+    """
+    Read multiple Parquet files as a single pyarrow.Table
+
+    Parameters
+    ----------
+    paths : List[str]
+        List of file paths
+    columns : List[str]
+        Names of columns to read from the file
+    filesystem : Filesystem, default None
+        If nothing passed, paths assumed to be found in the local on-disk
+        filesystem
+    metadata : pyarrow.parquet.FileMetaData
+        Use metadata obtained elsewhere to validate file schemas
+    schema : pyarrow.parquet.Schema
+        Use schema obtained elsewhere to validate file schemas. Alternative to
+        metadata parameter
+
+    Returns
+    -------
+    pyarrow.Table
+        Content of the file as a table (of columns)
+    """
+    if filesystem is None:
+        def open_file(path, meta=None):
+            return ParquetFile(path, metadata=meta)
+    else:
+        def open_file(path, meta=None):
+            return ParquetFile(filesystem.open(path, mode='rb'), metadata=meta)
+
+    if len(paths) == 0:
+        raise ValueError('Must pass at least one file path')
+
+    if metadata is None and schema is None:
+        schema = open_file(paths[0]).schema
+    elif schema is None:
+        schema = metadata.schema
+
+    # Verify schemas are all equal
+    all_file_metadata = []
+    for path in paths:
+        file_metadata = open_file(path).metadata
+        if not schema.equals(file_metadata.schema):
+            raise ValueError('Schema in {0} was different. {1!s} vs {2!s}'
+                             .format(path, file_metadata.schema, schema))
+        all_file_metadata.append(file_metadata)
+
+    # Read the tables
+    tables = []
+    for path, path_metadata in zip(paths, all_file_metadata):
+        reader = open_file(path, meta=path_metadata)
+        table = reader.read(columns=columns)
+        tables.append(table)
+
+    all_data = concat_tables(tables)
+    return all_data
+
+
+def write_table(table, sink, chunk_size=None, version='1.0',
+                use_dictionary=True, compression='snappy'):
     """
     Write a Table to Parquet format
 
@@ -110,7 +174,7 @@ def write_table(table, sink, chunk_size=None, version=None,
     compression : str or dict
         Specify the compression codec, either on a general basis or per-column.
     """
-    writer = _parquet.ParquetWriter(sink, use_dictionary=use_dictionary,
-                                    compression=compression,
-                                    version=version)
+    writer = ParquetWriter(sink, use_dictionary=use_dictionary,
+                           compression=compression,
+                           version=version)
     writer.write_table(table, row_group_size=chunk_size)

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index 0e3b2bd..9242330 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -265,16 +265,35 @@ cdef class Column:
 cdef _schema_from_arrays(arrays, names, shared_ptr[CSchema]* schema):
     cdef:
         Array arr
+        Column col
         c_string c_name
         vector[shared_ptr[CField]] fields
+        cdef shared_ptr[CDataType] type_
 
     cdef int K = len(arrays)
 
     fields.resize(K)
-    for i in range(K):
-        arr = arrays[i]
-        c_name = tobytes(names[i])
-        fields[i].reset(new CField(c_name, arr.type.sp_type, True))
+
+    if len(arrays) == 0:
+        raise ValueError('Must pass at least one array')
+
+    if isinstance(arrays[0], Array):
+        if names is None:
+            raise ValueError('Must pass names when constructing '
+                             'from Array objects')
+        for i in range(K):
+            arr = arrays[i]
+            type_ = arr.type.sp_type
+            c_name = tobytes(names[i])
+            fields[i].reset(new CField(c_name, type_, True))
+    elif isinstance(arrays[0], Column):
+        for i in range(K):
+            col = arrays[i]
+            type_ = col.sp_column.get().type()
+            c_name = tobytes(col.name)
+            fields[i].reset(new CField(c_name, type_, True))
+    else:
+        raise TypeError(type(arrays[0]))
 
     schema.reset(new CSchema(fields))
 
@@ -429,19 +448,19 @@ cdef class RecordBatch:
         pyarrow.table.RecordBatch
         """
         names, arrays = _dataframe_to_arrays(df, None, False, schema)
-        return cls.from_arrays(names, arrays)
+        return cls.from_arrays(arrays, names)
 
     @staticmethod
-    def from_arrays(names, arrays):
+    def from_arrays(arrays, names):
         """
         Construct a RecordBatch from multiple pyarrow.Arrays
 
         Parameters
         ----------
-        names: list of str
-            Labels for the columns
         arrays: list of pyarrow.Array
             column-wise data vectors
+        names: list of str
+            Labels for the columns
 
         Returns
         -------
@@ -594,20 +613,20 @@ cdef class Table:
         names, arrays = _dataframe_to_arrays(df, name=name,
                                              timestamps_to_ms=timestamps_to_ms,
                                              schema=schema)
-        return cls.from_arrays(names, arrays, name=name)
+        return cls.from_arrays(arrays, names=names, name=name)
 
     @staticmethod
-    def from_arrays(names, arrays, name=None):
+    def from_arrays(arrays, names=None, name=None):
         """
-        Construct a Table from Arrow Arrays
+        Construct a Table from Arrow arrays or columns
 
         Parameters
         ----------
-
-        names: list of str
-            Names for the table columns
-        arrays: list of pyarrow.array.Array
+        arrays: list of pyarrow.Array or pyarrow.Column
             Equal-length arrays that should form the table.
+        names: list of str, optional
+            Names for the table columns. If Columns passed, will be
+            inferred. If Arrays passed, this argument is required
         name: str, optional
             name for the Table
 
@@ -617,7 +636,6 @@ cdef class Table:
 
         """
         cdef:
-            Array arr
             c_string c_name
             vector[shared_ptr[CField]] fields
             vector[shared_ptr[CColumn]] columns
@@ -628,9 +646,15 @@ cdef class Table:
 
         cdef int K = len(arrays)
         columns.resize(K)
+
         for i in range(K):
-            arr = arrays[i]
-            columns[i].reset(new CColumn(schema.get().field(i), arr.sp_array))
+            if isinstance(arrays[i], Array):
+                columns[i].reset(new CColumn(schema.get().field(i),
+                                             (<Array> arrays[i]).sp_array))
+            elif isinstance(arrays[i], Column):
+                columns[i] = (<Column> arrays[i]).sp_column
+            else:
+                raise ValueError(type(arrays[i]))
 
         if name is None:
             c_name = ''

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_column.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_column.py b/python/pyarrow/tests/test_column.py
deleted file mode 100644
index 1a507c8..0000000
--- a/python/pyarrow/tests/test_column.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.compat import unittest
-import pyarrow as arrow
-
-A = arrow
-
-import pandas as pd
-
-
-class TestColumn(unittest.TestCase):
-
-    def test_basics(self):
-        data = [
-            A.from_pylist([-10, -5, 0, 5, 10])
-        ]
-        table = A.Table.from_arrays(('a'), data, 'table_name')
-        column = table.column(0)
-        assert column.name == 'a'
-        assert column.length() == 5
-        assert len(column) == 5
-        assert column.shape == (5,)
-        assert column.to_pylist() == [-10, -5, 0, 5, 10]
-
-    def test_pandas(self):
-        data = [
-            A.from_pylist([-10, -5, 0, 5, 10])
-        ]
-        table = A.Table.from_arrays(('a'), data, 'table_name')
-        column = table.column(0)
-        series = column.to_pandas()
-        assert series.name == 'a'
-        assert series.shape == (5,)
-        assert series.iloc[0] == -10

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_convert_builtin.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py
index 72e4389..c06d18d 100644
--- a/python/pyarrow/tests/test_convert_builtin.py
+++ b/python/pyarrow/tests/test_convert_builtin.py
@@ -16,11 +16,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from pyarrow.compat import unittest, u
+from pyarrow.compat import unittest, u  # noqa
 import pyarrow
 
 import datetime
 
+
 class TestConvertList(unittest.TestCase):
 
     def test_boolean(self):

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index a2f5062..30705c4 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -74,7 +74,7 @@ class TestPandasConversion(unittest.TestCase):
         tm.assert_frame_equal(result, expected)
 
     def _check_array_roundtrip(self, values, expected=None,
-                                timestamps_to_ms=False, field=None):
+                               timestamps_to_ms=False, field=None):
         arr = A.Array.from_pandas(values, timestamps_to_ms=timestamps_to_ms,
                                   field=field)
         result = arr.to_pandas()
@@ -118,7 +118,7 @@ class TestPandasConversion(unittest.TestCase):
         ex_frame = pd.DataFrame(dict(zip(names, expected_cols)),
                                 columns=names)
 
-        table = A.Table.from_arrays(names, arrays)
+        table = A.Table.from_arrays(arrays, names)
         assert table.schema.equals(A.Schema.from_fields(fields))
         result = table.to_pandas()
         tm.assert_frame_equal(result, ex_frame)
@@ -169,7 +169,7 @@ class TestPandasConversion(unittest.TestCase):
         ex_frame = pd.DataFrame(dict(zip(int_dtypes, expected_cols)),
                                 columns=int_dtypes)
 
-        table = A.Table.from_arrays(int_dtypes, arrays)
+        table = A.Table.from_arrays(arrays, int_dtypes)
         result = table.to_pandas()
 
         tm.assert_frame_equal(result, ex_frame)
@@ -201,7 +201,7 @@ class TestPandasConversion(unittest.TestCase):
         schema = A.Schema.from_fields([field])
         ex_frame = pd.DataFrame({'bools': expected})
 
-        table = A.Table.from_arrays(['bools'], [arr])
+        table = A.Table.from_arrays([arr], ['bools'])
         assert table.schema.equals(schema)
         result = table.to_pandas()
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_hdfs.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py
index 2056f7a..cb24adb 100644
--- a/python/pyarrow/tests/test_hdfs.py
+++ b/python/pyarrow/tests/test_hdfs.py
@@ -21,9 +21,16 @@ import os
 import random
 import unittest
 
+import numpy as np
+import pandas.util.testing as pdt
 import pytest
 
+from pyarrow.compat import guid
+from pyarrow.filesystem import HdfsClient
 import pyarrow.io as io
+import pyarrow as pa
+
+import pyarrow.tests.test_parquet as test_parquet
 
 # ----------------------------------------------------------------------
 # HDFS tests
@@ -38,7 +45,7 @@ def hdfs_test_client(driver='libhdfs'):
         raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
                          'an integer')
 
-    return io.HdfsClient(host, port, user, driver=driver)
+    return HdfsClient(host, port, user, driver=driver)
 
 
 class HdfsTestCases(object):
@@ -138,6 +145,43 @@ class HdfsTestCases(object):
 
         assert result == data
 
+    @test_parquet.parquet
+    def test_hdfs_read_multiple_parquet_files(self):
+        import pyarrow.parquet as pq
+
+        nfiles = 10
+        size = 5
+
+        tmpdir = pjoin(self.tmp_path, 'multi-parquet-' + guid())
+
+        self.hdfs.mkdir(tmpdir)
+
+        test_data = []
+        paths = []
+        for i in range(nfiles):
+            df = test_parquet._test_dataframe(size, seed=i)
+
+            df['index'] = np.arange(i * size, (i + 1) * size)
+
+            # Hack so that we don't have a dtype cast in v1 files
+            df['uint32'] = df['uint32'].astype(np.int64)
+
+            path = pjoin(tmpdir, '{0}.parquet'.format(i))
+
+            table = pa.Table.from_pandas(df)
+            with self.hdfs.open(path, 'wb') as f:
+                pq.write_table(table, f)
+
+            test_data.append(table)
+            paths.append(path)
+
+        result = self.hdfs.read_parquet(tmpdir)
+        expected = pa.concat_tables(test_data)
+
+        pdt.assert_frame_equal(result.to_pandas()
+                               .sort_values(by='index').reset_index(drop=True),
+                               expected.to_pandas())
+
 
 class TestLibHdfs(HdfsTestCases, unittest.TestCase):
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 9cf860a..a94fe45 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -15,10 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from os.path import join as pjoin
 import io
+import os
 import pytest
 
-import pyarrow as A
+from pyarrow.compat import guid
+import pyarrow as pa
 import pyarrow.io as paio
 
 import numpy as np
@@ -42,9 +45,9 @@ def test_single_pylist_column_roundtrip(tmpdir):
     for dtype in [int, float]:
         filename = tmpdir.join('single_{}_column.parquet'
                                .format(dtype.__name__))
-        data = [A.from_pylist(list(map(dtype, range(5))))]
-        table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
-        A.parquet.write_table(table, filename.strpath)
+        data = [pa.from_pylist(list(map(dtype, range(5))))]
+        table = pa.Table.from_arrays(data, names=('a', 'b'), name='table_name')
+        pq.write_table(table, filename.strpath)
         table_read = pq.read_table(filename.strpath)
         for col_written, col_read in zip(table.itercolumns(),
                                          table_read.itercolumns()):
@@ -85,8 +88,8 @@ def test_pandas_parquet_2_0_rountrip(tmpdir):
     df = alltypes_sample(size=10000)
 
     filename = tmpdir.join('pandas_rountrip.parquet')
-    arrow_table = A.Table.from_pandas(df, timestamps_to_ms=True)
-    A.parquet.write_table(arrow_table, filename.strpath, version="2.0")
+    arrow_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
+    pq.write_table(arrow_table, filename.strpath, version="2.0")
     table_read = pq.read_table(filename.strpath)
     df_read = table_read.to_pandas()
     pdt.assert_frame_equal(df, df_read)
@@ -113,8 +116,8 @@ def test_pandas_parquet_1_0_rountrip(tmpdir):
         'empty_str': [''] * size
     })
     filename = tmpdir.join('pandas_rountrip.parquet')
-    arrow_table = A.Table.from_pandas(df)
-    A.parquet.write_table(arrow_table, filename.strpath, version="1.0")
+    arrow_table = pa.Table.from_pandas(df)
+    pq.write_table(arrow_table, filename.strpath, version="1.0")
     table_read = pq.read_table(filename.strpath)
     df_read = table_read.to_pandas()
 
@@ -133,28 +136,39 @@ def test_pandas_column_selection(tmpdir):
         'uint16': np.arange(size, dtype=np.uint16)
     })
     filename = tmpdir.join('pandas_rountrip.parquet')
-    arrow_table = A.Table.from_pandas(df)
-    A.parquet.write_table(arrow_table, filename.strpath)
+    arrow_table = pa.Table.from_pandas(df)
+    pq.write_table(arrow_table, filename.strpath)
     table_read = pq.read_table(filename.strpath, columns=['uint8'])
     df_read = table_read.to_pandas()
 
     pdt.assert_frame_equal(df[['uint8']], df_read)
 
 
-def _test_dataframe(size=10000):
-    np.random.seed(0)
+def _random_integers(size, dtype):
+    # We do not generate integers outside the int64 range
+    i64_info = np.iinfo('int64')
+    iinfo = np.iinfo(dtype)
+    return np.random.randint(max(iinfo.min, i64_info.min),
+                             min(iinfo.max, i64_info.max),
+                             size=size).astype(dtype)
+
+
+def _test_dataframe(size=10000, seed=0):
+    np.random.seed(seed)
     df = pd.DataFrame({
-        'uint8': np.arange(size, dtype=np.uint8),
-        'uint16': np.arange(size, dtype=np.uint16),
-        'uint32': np.arange(size, dtype=np.uint32),
-        'uint64': np.arange(size, dtype=np.uint64),
-        'int8': np.arange(size, dtype=np.int16),
-        'int16': np.arange(size, dtype=np.int16),
-        'int32': np.arange(size, dtype=np.int32),
-        'int64': np.arange(size, dtype=np.int64),
-        'float32': np.arange(size, dtype=np.float32),
+        'uint8': _random_integers(size, np.uint8),
+        'uint16': _random_integers(size, np.uint16),
+        'uint32': _random_integers(size, np.uint32),
+        'uint64': _random_integers(size, np.uint64),
+        'int8': _random_integers(size, np.int8),
+        'int16': _random_integers(size, np.int16),
+        'int32': _random_integers(size, np.int32),
+        'int64': _random_integers(size, np.int64),
+        'float32': np.random.randn(size).astype(np.float32),
+        'float64': np.random.randn(size),
         'float64': np.arange(size, dtype=np.float64),
-        'bool': np.random.randn(size) > 0
+        'bool': np.random.randn(size) > 0,
+        'strings': [pdt.rands(10) for i in range(size)]
     })
     return df
 
@@ -162,7 +176,7 @@ def _test_dataframe(size=10000):
 @parquet
 def test_pandas_parquet_native_file_roundtrip(tmpdir):
     df = _test_dataframe(10000)
-    arrow_table = A.Table.from_pandas(df)
+    arrow_table = pa.Table.from_pandas(df)
     imos = paio.InMemoryOutputStream()
     pq.write_table(arrow_table, imos, version="2.0")
     buf = imos.get_result()
@@ -183,10 +197,10 @@ def test_pandas_parquet_pyfile_roundtrip(tmpdir):
         'strings': ['foo', 'bar', None, 'baz', 'qux']
     })
 
-    arrow_table = A.Table.from_pandas(df)
+    arrow_table = pa.Table.from_pandas(df)
 
     with open(filename, 'wb') as f:
-        A.parquet.write_table(arrow_table, f, version="1.0")
+        pq.write_table(arrow_table, f, version="1.0")
 
     data = io.BytesIO(open(filename, 'rb').read())
 
@@ -213,31 +227,27 @@ def test_pandas_parquet_configuration_options(tmpdir):
         'bool': np.random.randn(size) > 0
     })
     filename = tmpdir.join('pandas_rountrip.parquet')
-    arrow_table = A.Table.from_pandas(df)
+    arrow_table = pa.Table.from_pandas(df)
 
     for use_dictionary in [True, False]:
-        A.parquet.write_table(
-                arrow_table,
-                filename.strpath,
-                version="2.0",
-                use_dictionary=use_dictionary)
+        pq.write_table(arrow_table, filename.strpath,
+                       version="2.0",
+                       use_dictionary=use_dictionary)
         table_read = pq.read_table(filename.strpath)
         df_read = table_read.to_pandas()
         pdt.assert_frame_equal(df, df_read)
 
     for compression in ['NONE', 'SNAPPY', 'GZIP']:
-        A.parquet.write_table(
-                arrow_table,
-                filename.strpath,
-                version="2.0",
-                compression=compression)
+        pq.write_table(arrow_table, filename.strpath,
+                       version="2.0",
+                       compression=compression)
         table_read = pq.read_table(filename.strpath)
         df_read = table_read.to_pandas()
         pdt.assert_frame_equal(df, df_read)
 
 
 def make_sample_file(df):
-    a_table = A.Table.from_pandas(df, timestamps_to_ms=True)
+    a_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
 
     buf = io.BytesIO()
     pq.write_table(a_table, buf, compression='SNAPPY', version='2.0')
@@ -315,7 +325,7 @@ def test_pass_separate_metadata():
     # ARROW-471
     df = alltypes_sample(size=10000)
 
-    a_table = A.Table.from_pandas(df, timestamps_to_ms=True)
+    a_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
 
     buf = io.BytesIO()
     pq.write_table(a_table, buf, compression='snappy', version='2.0')
@@ -328,3 +338,72 @@ def test_pass_separate_metadata():
     fileh = pq.ParquetFile(buf, metadata=metadata)
 
     pdt.assert_frame_equal(df, fileh.read().to_pandas())
+
+
+@parquet
+def test_read_multiple_files(tmpdir):
+    nfiles = 10
+    size = 5
+
+    dirpath = tmpdir.join(guid()).strpath
+    os.mkdir(dirpath)
+
+    test_data = []
+    paths = []
+    for i in range(nfiles):
+        df = _test_dataframe(size, seed=i)
+
+        # Hack so that we don't have a dtype cast in v1 files
+        df['uint32'] = df['uint32'].astype(np.int64)
+
+        path = pjoin(dirpath, '{0}.parquet'.format(i))
+
+        table = pa.Table.from_pandas(df)
+        pq.write_table(table, path)
+
+        test_data.append(table)
+        paths.append(path)
+
+    result = pq.read_multiple_files(paths)
+    expected = pa.concat_tables(test_data)
+
+    assert result.equals(expected)
+
+    # Read with provided metadata
+    metadata = pq.ParquetFile(paths[0]).metadata
+
+    result2 = pq.read_multiple_files(paths, metadata=metadata)
+    assert result2.equals(expected)
+
+    result3 = pa.localfs.read_parquet(dirpath, schema=metadata.schema)
+    assert result3.equals(expected)
+
+    # Read column subset
+    to_read = [result[0], result[3], result[6]]
+    result = pa.localfs.read_parquet(
+        dirpath, columns=[c.name for c in to_read])
+    expected = pa.Table.from_arrays(to_read)
+    assert result.equals(expected)
+
+    # Test failure modes with non-uniform metadata
+    bad_apple = _test_dataframe(size, seed=i).iloc[:, :4]
+    bad_apple_path = tmpdir.join('{0}.parquet'.format(guid())).strpath
+
+    t = pa.Table.from_pandas(bad_apple)
+    pq.write_table(t, bad_apple_path)
+
+    bad_meta = pq.ParquetFile(bad_apple_path).metadata
+
+    with pytest.raises(ValueError):
+        pq.read_multiple_files(paths + [bad_apple_path])
+
+    with pytest.raises(ValueError):
+        pq.read_multiple_files(paths, metadata=bad_meta)
+
+    mixed_paths = [bad_apple_path, paths[0]]
+
+    with pytest.raises(ValueError):
+        pq.read_multiple_files(mixed_paths, schema=bad_meta.schema)
+
+    with pytest.raises(ValueError):
+        pq.read_multiple_files(mixed_paths)

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_scalars.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_scalars.py b/python/pyarrow/tests/test_scalars.py
index 62e51f8..ef600a0 100644
--- a/python/pyarrow/tests/test_scalars.py
+++ b/python/pyarrow/tests/test_scalars.py
@@ -32,7 +32,7 @@ class TestScalars(unittest.TestCase):
         v = arr[0]
         assert isinstance(v, A.BooleanValue)
         assert repr(v) == "True"
-        assert v.as_py() == True
+        assert v.as_py() is True
 
         assert arr[1] is A.NA
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_schema.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_schema.py b/python/pyarrow/tests/test_schema.py
index 4aa8112..507ebb8 100644
--- a/python/pyarrow/tests/test_schema.py
+++ b/python/pyarrow/tests/test_schema.py
@@ -85,4 +85,3 @@ baz: list<item: int8>"""
         del fields[-1]
         sch3 = A.schema(fields)
         assert not sch1.equals(sch3)
-

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index 6f00c73..d49b33c 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -21,16 +21,43 @@ from pandas.util.testing import assert_frame_equal
 import pandas as pd
 import pytest
 
+from pyarrow.compat import unittest
 import pyarrow as pa
 
 
+class TestColumn(unittest.TestCase):
+
+    def test_basics(self):
+        data = [
+            pa.from_pylist([-10, -5, 0, 5, 10])
+        ]
+        table = pa.Table.from_arrays(data, names=['a'], name='table_name')
+        column = table.column(0)
+        assert column.name == 'a'
+        assert column.length() == 5
+        assert len(column) == 5
+        assert column.shape == (5,)
+        assert column.to_pylist() == [-10, -5, 0, 5, 10]
+
+    def test_pandas(self):
+        data = [
+            pa.from_pylist([-10, -5, 0, 5, 10])
+        ]
+        table = pa.Table.from_arrays(data, names=['a'], name='table_name')
+        column = table.column(0)
+        series = column.to_pandas()
+        assert series.name == 'a'
+        assert series.shape == (5,)
+        assert series.iloc[0] == -10
+
+
 def test_recordbatch_basics():
     data = [
         pa.from_pylist(range(5)),
         pa.from_pylist([-10, -5, 0, 5, 10])
     ]
 
-    batch = pa.RecordBatch.from_arrays(['c0', 'c1'], data)
+    batch = pa.RecordBatch.from_arrays(data, ['c0', 'c1'])
 
     assert len(batch) == 5
     assert batch.num_rows == 5
@@ -95,7 +122,7 @@ def test_table_basics():
         pa.from_pylist(range(5)),
         pa.from_pylist([-10, -5, 0, 5, 10])
     ]
-    table = pa.Table.from_arrays(('a', 'b'), data, 'table_name')
+    table = pa.Table.from_arrays(data, names=('a', 'b'), name='table_name')
     assert table.name == 'table_name'
     assert len(table) == 5
     assert table.num_rows == 5
@@ -121,19 +148,19 @@ def test_concat_tables():
         [1., 2., 3., 4., 5.]
     ]
 
-    t1 = pa.Table.from_arrays(('a', 'b'), [pa.from_pylist(x)
-                                           for x in data], 'table_name')
-    t2 = pa.Table.from_arrays(('a', 'b'), [pa.from_pylist(x)
-                                           for x in data2], 'table_name')
+    t1 = pa.Table.from_arrays([pa.from_pylist(x) for x in data],
+                              names=('a', 'b'), name='table_name')
+    t2 = pa.Table.from_arrays([pa.from_pylist(x) for x in data2],
+                              names=('a', 'b'), name='table_name')
 
     result = pa.concat_tables([t1, t2], output_name='foo')
     assert result.name == 'foo'
     assert len(result) == 10
 
-    expected = pa.Table.from_arrays(
-        ('a', 'b'), [pa.from_pylist(x + y)
-                     for x, y in zip(data, data2)],
-        'foo')
+    expected = pa.Table.from_arrays([pa.from_pylist(x + y)
+                                     for x, y in zip(data, data2)],
+                                    names=('a', 'b'),
+                                    name='foo')
 
     assert result.equals(expected)
 
@@ -143,7 +170,8 @@ def test_table_pandas():
         pa.from_pylist(range(5)),
         pa.from_pylist([-10, -5, 0, 5, 10])
     ]
-    table = pa.Table.from_arrays(('a', 'b'), data, 'table_name')
+    table = pa.Table.from_arrays(data, names=('a', 'b'),
+                                 name='table_name')
 
     # TODO: Use this part once from_pandas is implemented
     # data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]}

http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/util.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/util.py b/python/pyarrow/util.py
new file mode 100644
index 0000000..4b6a835
--- /dev/null
+++ b/python/pyarrow/util.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Miscellaneous utility code
+
+
+def implements(f):
+    def decorator(g):
+        g.__doc__ = f.__doc__
+        return g
+    return decorator