You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/12 17:05:27 UTC

arrow git commit: ARROW-539: [Python] Add support for reading partitioned Parquet files with Hive-like directory schemes

Repository: arrow
Updated Branches:
  refs/heads/master 9db96fea4 -> 9d532c49d


ARROW-539: [Python] Add support for reading partitioned Parquet files with Hive-like directory schemes

I probably didn't get all the use cases, but this should be a good start.

First, the directory structure is walked to determine the distinct partition keys. These keys are later used as the dictionary for `arrow::DictionaryArray` objects which are constructed.

I also created the `ParquetDatasetPiece` class to enable distributed processing of file components in frameworks like Dask. We may need to address pickling of the `ParquetPartitions` object (which must be passed to `ParquetDatasetPiece.read` so the right array metadata can be constructed.

Author: Wes McKinney <we...@twosigma.com>
Author: Miki Tebeka <mi...@gmail.com>

Closes #529 from wesm/ARROW-539 and squashes the following commits:

a0451fa [Wes McKinney] Code review comments
deb6d82 [Wes McKinney] Don't make file-like Python object on LocalFilesystem
04dc691 [Wes McKinney] Complete initial partitioned reads, supporting unit tests. Expose arrow::Table::AddColumn
7d33755 [Wes McKinney] Untested draft of ParquetManifest for partitioned directory structures. Get test suite passing again
ba8825f [Wes McKinney] Prototyping
18fe639 [Wes McKinney] Refactoring, add ParquetDataset, ParquetDatasetPiece
016b445 [Miki Tebeka] [ARROW-539] [Python] Support reading Parquet datasets with standard partition directory schemes


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/9d532c49
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/9d532c49
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/9d532c49

Branch: refs/heads/master
Commit: 9d532c49d563ec22f73af3cc49549eb2e5cb6898
Parents: 9db96fe
Author: Wes McKinney <we...@twosigma.com>
Authored: Wed Apr 12 13:05:21 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Wed Apr 12 13:05:21 2017 -0400

----------------------------------------------------------------------
 python/pyarrow/filesystem.py         |  25 +-
 python/pyarrow/includes/libarrow.pxd |   2 +
 python/pyarrow/parquet.py            | 547 ++++++++++++++++++++++++++----
 python/pyarrow/table.pxd             |   1 +
 python/pyarrow/table.pyx             |  40 ++-
 python/pyarrow/tests/test_parquet.py | 156 +++++++--
 python/pyarrow/tests/test_table.py   |  31 ++
 7 files changed, 692 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/filesystem.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py
index e820806..269cf1c 100644
--- a/python/pyarrow/filesystem.py
+++ b/python/pyarrow/filesystem.py
@@ -87,20 +87,10 @@ class Filesystem(object):
         -------
         table : pyarrow.Table
         """
-        from pyarrow.parquet import read_multiple_files
-
-        if self.isdir(path):
-            paths_to_read = []
-            for path in self.ls(path):
-                if path.endswith('parq') or path.endswith('parquet'):
-                    paths_to_read.append(path)
-        else:
-            paths_to_read = [path]
-
-        return read_multiple_files(paths_to_read, columns=columns,
-                                   filesystem=self, schema=schema,
-                                   metadata=metadata,
-                                   nthreads=nthreads)
+        from pyarrow.parquet import ParquetDataset
+        dataset = ParquetDataset(path, schema=schema, metadata=metadata,
+                                 filesystem=self)
+        return dataset.read(columns=columns, nthreads=nthreads)
 
 
 class LocalFilesystem(Filesystem):
@@ -117,6 +107,13 @@ class LocalFilesystem(Filesystem):
     def ls(self, path):
         return sorted(pjoin(path, x) for x in os.listdir(path))
 
+    @implements(Filesystem.mkdir)
+    def mkdir(self, path, create_parents=True):
+        if create_parents:
+            os.makedirs(path)
+        else:
+            os.mkdir(path)
+
     @implements(Filesystem.isdir)
     def isdir(self, path):
         return os.path.isdir(path)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 40dd837..ae2b45f 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -291,6 +291,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         shared_ptr[CSchema] schema()
         shared_ptr[CColumn] column(int i)
 
+        CStatus AddColumn(int i, const shared_ptr[CColumn]& column,
+                          shared_ptr[CTable]* out)
         CStatus RemoveColumn(int i, shared_ptr[CTable]* out)
 
     cdef cppclass CTensor" arrow::Tensor":

http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index d95c3b3..f81b6c2 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -17,18 +17,23 @@
 
 import six
 
+import numpy as np
+
+from pyarrow.filesystem import LocalFilesystem
 from pyarrow._parquet import (ParquetReader, FileMetaData,  # noqa
                               RowGroupMetaData, Schema, ParquetWriter)
 import pyarrow._parquet as _parquet  # noqa
-from pyarrow.table import concat_tables
+import pyarrow.array as _array
+import pyarrow.table as _table
 
 
-EXCLUDED_PARQUET_PATHS = {'_metadata', '_common_metadata', '_SUCCESS'}
+# ----------------------------------------------------------------------
+# Reading a single Parquet file
 
 
 class ParquetFile(object):
     """
-    Open a Parquet binary file for reading
+    Reader interface for a single Parquet file
 
     Parameters
     ----------
@@ -72,7 +77,8 @@ class ParquetFile(object):
             Content of the row group as a table (of columns)
         """
         column_indices = self._get_column_indices(columns)
-        self.reader.set_num_threads(nthreads)
+        if nthreads is not None:
+            self.reader.set_num_threads(nthreads)
         return self.reader.read_row_group(i, column_indices=column_indices)
 
     def read(self, columns=None, nthreads=1):
@@ -93,7 +99,8 @@ class ParquetFile(object):
             Content of the file as a table (of columns)
         """
         column_indices = self._get_column_indices(columns)
-        self.reader.set_num_threads(nthreads)
+        if nthreads is not None:
+            self.reader.set_num_threads(nthreads)
         return self.reader.read_all(column_indices=column_indices)
 
     def _get_column_indices(self, column_names):
@@ -104,6 +111,463 @@ class ParquetFile(object):
                     for column in column_names]
 
 
+# ----------------------------------------------------------------------
+# Metadata container providing instructions about reading a single Parquet
+# file, possibly part of a partitioned dataset
+
+
+class ParquetDatasetPiece(object):
+    """
+    A single chunk of a potentially larger Parquet dataset to read. The
+    arguments will indicate to read either a single row group or all row
+    groups, and whether to add partition keys to the resulting pyarrow.Table
+
+    Parameters
+    ----------
+    path : str
+        Path to file in the file system where this piece is located
+    partition_keys : list of tuples
+      [(column name, ordinal index)]
+    row_group : int, default None
+        Row group to load. By default, reads all row groups
+    """
+
+    def __init__(self, path, row_group=None, partition_keys=None):
+        self.path = path
+        self.row_group = row_group
+        self.partition_keys = partition_keys or []
+
+    def __eq__(self, other):
+        if not isinstance(other, ParquetDatasetPiece):
+            return False
+        return (self.path == other.path and
+                self.row_group == other.row_group and
+                self.partition_keys == other.partition_keys)
+
+    def __ne__(self, other):
+        return not (self == other)
+
+    def __repr__(self):
+        return ('{0}({1!r}, row_group={2!r}, partition_keys={3!r})'
+                .format(type(self).__name__, self.path,
+                        self.row_group,
+                        self.partition_keys))
+
+    def __str__(self):
+        result = ''
+
+        if len(self.partition_keys) > 0:
+            partition_str = ', '.join('{0}={1}'.format(name, index)
+                                      for name, index in self.partition_keys)
+            result += 'partition[{0}] '.format(partition_str)
+
+        result += self.path
+
+        if self.row_group is not None:
+            result += ' | row_group={0}'.format(self.row_group)
+
+        return result
+
+    def get_metadata(self, open_file_func=None):
+        """
+        Given a function that can create an open ParquetFile object, return the
+        file's metadata
+        """
+        return self._open(open_file_func).metadata
+
+    def _open(self, open_file_func=None):
+        """
+        Returns instance of ParquetFile
+        """
+        if open_file_func is None:
+            def simple_opener(path):
+                return ParquetFile(path)
+            open_file_func = simple_opener
+        return open_file_func(self.path)
+
+    def read(self, columns=None, nthreads=1, partitions=None,
+             open_file_func=None):
+        """
+        Read this piece as a pyarrow.Table
+
+        Parameters
+        ----------
+        columns : list of column names, default None
+        nthreads : int, default 1
+            For multithreaded file reads
+        partitions : ParquetPartitions, default None
+        open_file_func : function, default None
+            A function that knows how to construct a ParquetFile object given
+            the file path in this piece
+
+        Returns
+        -------
+        table : pyarrow.Table
+        """
+        reader = self._open(open_file_func)
+
+        if self.row_group is not None:
+            table = reader.read_row_group(self.row_group, columns=columns,
+                                          nthreads=nthreads)
+        else:
+            table = reader.read(columns=columns, nthreads=nthreads)
+
+        if len(self.partition_keys) > 0:
+            if partitions is None:
+                raise ValueError('Must pass partition sets')
+
+            # Here, the index is the categorical code of the partition where
+            # this piece is located. Suppose we had
+            #
+            # /foo=a/0.parq
+            # /foo=b/0.parq
+            # /foo=c/0.parq
+            #
+            # Then we assign a=0, b=1, c=2. And the resulting Table pieces will
+            # have a DictionaryArray column named foo having the constant index
+            # value as indicated. The distinct categories of the partition have
+            # been computed in the ParquetManifest
+            for i, (name, index) in enumerate(self.partition_keys):
+                # The partition code is the same for all values in this piece
+                indices = np.array([index], dtype='i4').repeat(len(table))
+
+                # This is set of all partition values, computed as part of the
+                # manifest, so ['a', 'b', 'c'] as in our example above.
+                dictionary = partitions.levels[i].dictionary
+
+                arr = _array.DictionaryArray.from_arrays(indices, dictionary)
+                col = _table.Column.from_array(name, arr)
+                table = table.append_column(col)
+
+        return table
+
+
+def _is_parquet_file(path):
+    return path.endswith('parq') or path.endswith('parquet')
+
+
+class PartitionSet(object):
+    """A data structure for cataloguing the observed Parquet partitions at a
+    particular level. So if we have
+
+    /foo=a/bar=0
+    /foo=a/bar=1
+    /foo=a/bar=2
+    /foo=b/bar=0
+    /foo=b/bar=1
+    /foo=b/bar=2
+
+    Then we have two partition sets, one for foo, another for bar. As we visit
+    levels of the partition hierarchy, a PartitionSet tracks the distinct
+    values and assigns categorical codes to use when reading the pieces
+    """
+
+    def __init__(self, name, keys=None):
+        self.name = name
+        self.keys = keys or []
+        self.key_indices = {k: i for i, k in enumerate(self.keys)}
+        self._dictionary = None
+
+    def get_index(self, key):
+        """
+        Get the index of the partition value if it is known, otherwise assign
+        one
+        """
+        if key in self.key_indices:
+            return self.key_indices[key]
+        else:
+            index = len(self.key_indices)
+            self.keys.append(key)
+            self.key_indices[key] = index
+            return index
+
+    @property
+    def dictionary(self):
+        if self._dictionary is not None:
+            return self._dictionary
+
+        if len(self.keys) == 0:
+            raise ValueError('No known partition keys')
+
+        # Only integer and string partition types are supported right now
+        try:
+            integer_keys = [int(x) for x in self.keys]
+            dictionary = _array.from_pylist(integer_keys)
+        except ValueError:
+            dictionary = _array.from_pylist(self.keys)
+
+        self._dictionary = dictionary
+        return dictionary
+
+    @property
+    def is_sorted(self):
+        return list(self.keys) == sorted(self.keys)
+
+
+class ParquetPartitions(object):
+
+    def __init__(self):
+        self.levels = []
+        self.partition_names = set()
+
+    def __len__(self):
+        return len(self.levels)
+
+    def __getitem__(self, i):
+        return self.levels[i]
+
+    def get_index(self, level, name, key):
+        """
+        Record a partition value at a particular level, returning the distinct
+        code for that value at that level. Example:
+
+        partitions.get_index(1, 'foo', 'a') returns 0
+        partitions.get_index(1, 'foo', 'b') returns 1
+        partitions.get_index(1, 'foo', 'c') returns 2
+        partitions.get_index(1, 'foo', 'a') returns 0
+
+        Parameters
+        ----------
+        level : int
+            The nesting level of the partition we are observing
+        name : string
+            The partition name
+        key : string or int
+            The partition value
+        """
+        if level == len(self.levels):
+            if name in self.partition_names:
+                raise ValueError('{0} was the name of the partition in '
+                                 'another level'.format(name))
+
+            part_set = PartitionSet(name)
+            self.levels.append(part_set)
+            self.partition_names.add(name)
+
+        return self.levels[level].get_index(key)
+
+
+def is_string(x):
+    return isinstance(x, six.string_types)
+
+
+class ParquetManifest(object):
+    """
+
+    """
+    def __init__(self, dirpath, filesystem=None, pathsep='/',
+                 partition_scheme='hive'):
+        self.filesystem = filesystem or LocalFilesystem.get_instance()
+        self.pathsep = pathsep
+        self.dirpath = dirpath
+        self.partition_scheme = partition_scheme
+        self.partitions = ParquetPartitions()
+        self.pieces = []
+
+        self.common_metadata_path = None
+        self.metadata_path = None
+
+        self._visit_level(0, self.dirpath, [])
+
+    def _visit_level(self, level, base_path, part_keys):
+        directories = []
+        files = []
+        fs = self.filesystem
+
+        if not fs.isdir(base_path):
+            raise ValueError('"{0}" is not a directory'.format(base_path))
+
+        for path in sorted(fs.ls(base_path)):
+            if fs.isfile(path):
+                if _is_parquet_file(path):
+                    files.append(path)
+                elif path.endswith('_common_metadata'):
+                    self.common_metadata_path = path
+                elif path.endswith('_metadata'):
+                    self.metadata_path = path
+                elif not self._should_silently_exclude(path):
+                    print('Ignoring path: {0}'.format(path))
+            elif fs.isdir(path):
+                directories.append(path)
+
+        if len(files) > 0 and len(directories) > 0:
+            raise ValueError('Found files in an intermediate '
+                             'directory: {0}'.format(base_path))
+        elif len(directories) > 0:
+            self._visit_directories(level, directories, part_keys)
+        else:
+            self._push_pieces(files, part_keys)
+
+    def _should_silently_exclude(self, path):
+        _, tail = path.rsplit(self.pathsep, 1)
+        return tail.endswith('.crc') or tail in EXCLUDED_PARQUET_PATHS
+
+    def _visit_directories(self, level, directories, part_keys):
+        for path in directories:
+            head, tail = _path_split(path, self.pathsep)
+            name, key = _parse_hive_partition(tail)
+
+            index = self.partitions.get_index(level, name, key)
+            dir_part_keys = part_keys + [(name, index)]
+            self._visit_level(level + 1, path, dir_part_keys)
+
+    def _parse_partition(self, dirname):
+        if self.partition_scheme == 'hive':
+            return _parse_hive_partition(dirname)
+        else:
+            raise NotImplementedError('partition schema: {0}'
+                                      .format(self.partition_scheme))
+
+    def _push_pieces(self, files, part_keys):
+        self.pieces.extend([
+            ParquetDatasetPiece(path, partition_keys=part_keys)
+            for path in files
+        ])
+
+
+def _parse_hive_partition(value):
+    if '=' not in value:
+        raise ValueError('Directory name did not appear to be a '
+                         'partition: {0}'.format(value))
+    return value.split('=', 1)
+
+
+def _path_split(path, sep):
+    i = path.rfind(sep) + 1
+    head, tail = path[:i], path[i:]
+    head = head.rstrip(sep)
+    return head, tail
+
+
+EXCLUDED_PARQUET_PATHS = {'_SUCCESS'}
+
+
+class ParquetDataset(object):
+    """
+    Encapsulates details of reading a complete Parquet dataset possibly
+    consisting of multiple files and partitions in subdirectories
+
+    Parameters
+    ----------
+    path_or_paths : str or List[str]
+        A directory name, single file name, or list of file names
+    filesystem : Filesystem, default None
+        If nothing passed, paths assumed to be found in the local on-disk
+        filesystem
+    metadata : pyarrow.parquet.FileMetaData
+        Use metadata obtained elsewhere to validate file schemas
+    schema : pyarrow.parquet.Schema
+        Use schema obtained elsewhere to validate file schemas. Alternative to
+        metadata parameter
+    split_row_groups : boolean, default False
+        Divide files into pieces for each row group in the file
+    validate_schema : boolean, default True
+        Check that individual file schemas are all the same / compatible
+    """
+    def __init__(self, path_or_paths, filesystem=None, schema=None,
+                 metadata=None, split_row_groups=False, validate_schema=True):
+        if filesystem is None:
+            self.fs = LocalFilesystem.get_instance()
+        else:
+            self.fs = filesystem
+
+        self.pieces, self.partitions = _make_manifest(path_or_paths, self.fs)
+
+        self.metadata = metadata
+        self.schema = schema
+
+        self.split_row_groups = split_row_groups
+
+        if split_row_groups:
+            raise NotImplementedError("split_row_groups not yet implemented")
+
+        if validate_schema:
+            self.validate_schemas()
+
+    def validate_schemas(self):
+        open_file = self._get_open_file_func()
+
+        if self.metadata is None and self.schema is None:
+            self.schema = self.pieces[0].get_metadata(open_file).schema
+        elif self.schema is None:
+            self.schema = self.metadata.schema
+
+        # Verify schemas are all equal
+        for piece in self.pieces:
+            file_metadata = piece.get_metadata(open_file)
+            if not self.schema.equals(file_metadata.schema):
+                raise ValueError('Schema in {0!s} was different. '
+                                 '{1!s} vs {2!s}'
+                                 .format(piece, file_metadata.schema,
+                                         self.schema))
+
+    def read(self, columns=None, nthreads=1):
+        """
+        Read multiple Parquet files as a single pyarrow.Table
+
+        Parameters
+        ----------
+        columns : List[str]
+            Names of columns to read from the file
+        nthreads : int, default 1
+            Number of columns to read in parallel. Requires that the underlying
+            file source is threadsafe
+
+        Returns
+        -------
+        pyarrow.Table
+            Content of the file as a table (of columns)
+        """
+        open_file = self._get_open_file_func()
+
+        tables = []
+        for piece in self.pieces:
+            table = piece.read(columns=columns, nthreads=nthreads,
+                               partitions=self.partitions,
+                               open_file_func=open_file)
+            tables.append(table)
+
+        all_data = _table.concat_tables(tables)
+        return all_data
+
+    def _get_open_file_func(self):
+        if self.fs is None or isinstance(self.fs, LocalFilesystem):
+            def open_file(path, meta=None):
+                return ParquetFile(path, metadata=meta)
+        else:
+            def open_file(path, meta=None):
+                return ParquetFile(self.fs.open(path, mode='rb'),
+                                   metadata=meta)
+        return open_file
+
+
+def _make_manifest(path_or_paths, fs, pathsep='/'):
+    partitions = None
+
+    if is_string(path_or_paths) and fs.isdir(path_or_paths):
+        manifest = ParquetManifest(path_or_paths, filesystem=fs,
+                                   pathsep=pathsep)
+        pieces = manifest.pieces
+        partitions = manifest.partitions
+    else:
+        if not isinstance(path_or_paths, list):
+            path_or_paths = [path_or_paths]
+
+        # List of paths
+        if len(path_or_paths) == 0:
+            raise ValueError('Must pass at least one file path')
+
+        pieces = []
+        for path in path_or_paths:
+            if not fs.isfile(path):
+                raise IOError('Passed non-file path: {0}'
+                              .format(path))
+            piece = ParquetDatasetPiece(path)
+            pieces.append(piece)
+
+    return pieces, partitions
+
+
 def read_table(source, columns=None, nthreads=1, metadata=None):
     """
     Read a Table from Parquet format
@@ -127,9 +591,7 @@ def read_table(source, columns=None, nthreads=1, metadata=None):
     pyarrow.Table
         Content of the file as a table (of columns)
     """
-    from pyarrow.filesystem import LocalFilesystem
-
-    if isinstance(source, six.string_types):
+    if is_string(source):
         fs = LocalFilesystem.get_instance()
         if fs.isdir(source):
             return fs.read_parquet(source, columns=columns,
@@ -139,70 +601,7 @@ def read_table(source, columns=None, nthreads=1, metadata=None):
     return pf.read(columns=columns, nthreads=nthreads)
 
 
-def read_multiple_files(paths, columns=None, filesystem=None, nthreads=1,
-                        metadata=None, schema=None):
-    """
-    Read multiple Parquet files as a single pyarrow.Table
-
-    Parameters
-    ----------
-    paths : List[str]
-        List of file paths
-    columns : List[str]
-        Names of columns to read from the file
-    filesystem : Filesystem, default None
-        If nothing passed, paths assumed to be found in the local on-disk
-        filesystem
-    nthreads : int, default 1
-        Number of columns to read in parallel. Requires that the underlying
-        file source is threadsafe
-    metadata : pyarrow.parquet.FileMetaData
-        Use metadata obtained elsewhere to validate file schemas
-    schema : pyarrow.parquet.Schema
-        Use schema obtained elsewhere to validate file schemas. Alternative to
-        metadata parameter
-
-    Returns
-    -------
-    pyarrow.Table
-        Content of the file as a table (of columns)
-    """
-    if filesystem is None:
-        def open_file(path, meta=None):
-            return ParquetFile(path, metadata=meta)
-    else:
-        def open_file(path, meta=None):
-            return ParquetFile(filesystem.open(path, mode='rb'), metadata=meta)
-
-    if len(paths) == 0:
-        raise ValueError('Must pass at least one file path')
-
-    if metadata is None and schema is None:
-        schema = open_file(paths[0]).schema
-    elif schema is None:
-        schema = metadata.schema
-
-    # Verify schemas are all equal
-    all_file_metadata = []
-    for path in paths:
-        file_metadata = open_file(path).metadata
-        if not schema.equals(file_metadata.schema):
-            raise ValueError('Schema in {0} was different. {1!s} vs {2!s}'
-                             .format(path, file_metadata.schema, schema))
-        all_file_metadata.append(file_metadata)
-
-    # Read the tables
-    tables = []
-    for path, path_metadata in zip(paths, all_file_metadata):
-        reader = open_file(path, meta=path_metadata)
-        table = reader.read(columns=columns, nthreads=nthreads)
-        tables.append(table)
-
-    all_data = concat_tables(tables)
-    return all_data
-
-
-def write_table(table, sink, row_group_size=None, version='1.0',
+def write_table(table, where, row_group_size=None, version='1.0',
                 use_dictionary=True, compression='snappy', **kwargs):
     """
     Write a Table to Parquet format
@@ -210,7 +609,7 @@ def write_table(table, sink, row_group_size=None, version='1.0',
     Parameters
     ----------
     table : pyarrow.Table
-    sink: string or pyarrow.io.NativeFile
+    where: string or pyarrow.io.NativeFile
     row_group_size : int, default None
         The maximum number of rows in each Parquet RowGroup. As a default,
         we will write a single RowGroup per file.
@@ -223,7 +622,7 @@ def write_table(table, sink, row_group_size=None, version='1.0',
         Specify the compression codec, either on a general basis or per-column.
     """
     row_group_size = kwargs.get('chunk_size', row_group_size)
-    writer = ParquetWriter(sink, use_dictionary=use_dictionary,
+    writer = ParquetWriter(where, use_dictionary=use_dictionary,
                            compression=compression,
                            version=version)
     writer.write_table(table, row_group_size=row_group_size)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/table.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxd b/python/pyarrow/table.pxd
index 389727b..f564042 100644
--- a/python/pyarrow/table.pxd
+++ b/python/pyarrow/table.pxd
@@ -58,5 +58,6 @@ cdef class RecordBatch:
     cdef init(self, const shared_ptr[CRecordBatch]& table)
     cdef _check_nullptr(self)
 
+cdef object box_column(const shared_ptr[CColumn]& ccolumn)
 cdef api object table_from_ctable(const shared_ptr[CTable]& ctable)
 cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index 94389a7..3972bda 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -30,8 +30,9 @@ import pyarrow.config
 from pyarrow.array cimport Array, box_array, wrap_array_output
 from pyarrow.error import ArrowException
 from pyarrow.error cimport check_status
-from pyarrow.schema cimport box_data_type, box_schema, DataType
+from pyarrow.schema cimport box_data_type, box_schema, DataType, Field
 
+from pyarrow.schema import field
 from pyarrow.compat import frombytes, tobytes
 
 cimport cpython
@@ -141,6 +142,19 @@ cdef class Column:
         self.sp_column = column
         self.column = column.get()
 
+    @staticmethod
+    def from_array(object field_or_name, Array arr):
+        cdef Field boxed_field
+
+        if isinstance(field_or_name, Field):
+            boxed_field = field_or_name
+        else:
+            boxed_field = field(field_or_name, arr.type)
+
+        cdef shared_ptr[CColumn] sp_column
+        sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array))
+        return box_column(sp_column)
+
     def to_pandas(self):
         """
         Convert the arrow::Column to a pandas.Series
@@ -828,6 +842,24 @@ cdef class Table:
         """
         return (self.num_rows, self.num_columns)
 
+    def add_column(self, int i, Column column):
+        """
+        Add column to Table at position. Returns new table
+        """
+        cdef:
+            shared_ptr[CTable] c_table
+
+        with nogil:
+            check_status(self.table.AddColumn(i, column.sp_column, &c_table))
+
+        return table_from_ctable(c_table)
+
+    def append_column(self, Column column):
+        """
+        Append column at end of columns. Returns new table
+        """
+        return self.add_column(self.num_columns, column)
+
     def remove_column(self, int i):
         """
         Create new Table with the indicated column removed
@@ -865,6 +897,12 @@ def concat_tables(tables):
     return table_from_ctable(c_result)
 
 
+cdef object box_column(const shared_ptr[CColumn]& ccolumn):
+    cdef Column column = Column()
+    column.init(ccolumn)
+    return column
+
+
 cdef api object table_from_ctable(const shared_ptr[CTable]& ctable):
     cdef Table table = Table()
     table.init(ctable)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 86165be..de1b148 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -16,11 +16,13 @@
 # under the License.
 
 from os.path import join as pjoin
+import datetime
 import io
 import os
 import pytest
 
-from pyarrow.compat import guid
+from pyarrow.compat import guid, u
+from pyarrow.filesystem import LocalFilesystem
 import pyarrow as pa
 import pyarrow.io as paio
 from .pandas_examples import dataframe_with_arrays, dataframe_with_lists
@@ -28,7 +30,7 @@ from .pandas_examples import dataframe_with_arrays, dataframe_with_lists
 import numpy as np
 import pandas as pd
 
-import pandas.util.testing as pdt
+import pandas.util.testing as tm
 
 try:
     import pyarrow.parquet as pq
@@ -93,7 +95,7 @@ def test_pandas_parquet_2_0_rountrip(tmpdir):
     pq.write_table(arrow_table, filename.strpath, version="2.0")
     table_read = pq.read_table(filename.strpath)
     df_read = table_read.to_pandas()
-    pdt.assert_frame_equal(df, df_read)
+    tm.assert_frame_equal(df, df_read)
 
 
 @parquet
@@ -125,7 +127,7 @@ def test_pandas_parquet_1_0_rountrip(tmpdir):
     # We pass uint32_t as int64_t if we write Parquet version 1.0
     df['uint32'] = df['uint32'].values.astype(np.int64)
 
-    pdt.assert_frame_equal(df, df_read)
+    tm.assert_frame_equal(df, df_read)
 
 
 @parquet
@@ -142,7 +144,7 @@ def test_pandas_column_selection(tmpdir):
     table_read = pq.read_table(filename.strpath, columns=['uint8'])
     df_read = table_read.to_pandas()
 
-    pdt.assert_frame_equal(df[['uint8']], df_read)
+    tm.assert_frame_equal(df[['uint8']], df_read)
 
 
 def _random_integers(size, dtype):
@@ -169,7 +171,7 @@ def _test_dataframe(size=10000, seed=0):
         'float64': np.random.randn(size),
         'float64': np.arange(size, dtype=np.float64),
         'bool': np.random.randn(size) > 0,
-        'strings': [pdt.rands(10) for i in range(size)]
+        'strings': [tm.rands(10) for i in range(size)]
     })
     return df
 
@@ -183,7 +185,7 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir):
     buf = imos.get_result()
     reader = paio.BufferReader(buf)
     df_read = pq.read_table(reader).to_pandas()
-    pdt.assert_frame_equal(df, df_read)
+    tm.assert_frame_equal(df, df_read)
 
 
 @parquet
@@ -207,7 +209,7 @@ def test_pandas_parquet_pyfile_roundtrip(tmpdir):
 
     table_read = pq.read_table(data)
     df_read = table_read.to_pandas()
-    pdt.assert_frame_equal(df, df_read)
+    tm.assert_frame_equal(df, df_read)
 
 
 @parquet
@@ -236,7 +238,7 @@ def test_pandas_parquet_configuration_options(tmpdir):
                        use_dictionary=use_dictionary)
         table_read = pq.read_table(filename.strpath)
         df_read = table_read.to_pandas()
-        pdt.assert_frame_equal(df, df_read)
+        tm.assert_frame_equal(df, df_read)
 
     for compression in ['NONE', 'SNAPPY', 'GZIP']:
         pq.write_table(arrow_table, filename.strpath,
@@ -244,7 +246,7 @@ def test_pandas_parquet_configuration_options(tmpdir):
                        compression=compression)
         table_read = pq.read_table(filename.strpath)
         df_read = table_read.to_pandas()
-        pdt.assert_frame_equal(df, df_read)
+        tm.assert_frame_equal(df, df_read)
 
 
 def make_sample_file(df):
@@ -331,7 +333,7 @@ def test_column_of_arrays(tmpdir):
     pq.write_table(arrow_table, filename.strpath, version="2.0")
     table_read = pq.read_table(filename.strpath)
     df_read = table_read.to_pandas()
-    pdt.assert_frame_equal(df, df_read)
+    tm.assert_frame_equal(df, df_read)
 
 
 @parquet
@@ -344,7 +346,7 @@ def test_column_of_lists(tmpdir):
     pq.write_table(arrow_table, filename.strpath, version="2.0")
     table_read = pq.read_table(filename.strpath)
     df_read = table_read.to_pandas()
-    pdt.assert_frame_equal(df, df_read)
+    tm.assert_frame_equal(df, df_read)
 
 
 @parquet
@@ -399,7 +401,7 @@ def test_pass_separate_metadata():
 
     fileh = pq.ParquetFile(buf, metadata=metadata)
 
-    pdt.assert_frame_equal(df, fileh.read().to_pandas())
+    tm.assert_frame_equal(df, fileh.read().to_pandas())
 
 
 @parquet
@@ -422,13 +424,121 @@ def test_read_single_row_group():
 
     row_groups = [pf.read_row_group(i) for i in range(K)]
     result = pa.concat_tables(row_groups)
-    pdt.assert_frame_equal(df, result.to_pandas())
+    tm.assert_frame_equal(df, result.to_pandas())
 
     cols = df.columns[:2]
     row_groups = [pf.read_row_group(i, columns=cols)
                   for i in range(K)]
     result = pa.concat_tables(row_groups)
-    pdt.assert_frame_equal(df[cols], result.to_pandas())
+    tm.assert_frame_equal(df[cols], result.to_pandas())
+
+
+@parquet
+def test_parquet_piece_basics():
+    path = '/baz.parq'
+
+    piece1 = pq.ParquetDatasetPiece(path)
+    piece2 = pq.ParquetDatasetPiece(path, row_group=1)
+    piece3 = pq.ParquetDatasetPiece(
+        path, row_group=1, partition_keys=[('foo', 0), ('bar', 1)])
+
+    assert str(piece1) == path
+    assert str(piece2) == '/baz.parq | row_group=1'
+    assert str(piece3) == 'partition[foo=0, bar=1] /baz.parq | row_group=1'
+
+    assert piece1 == piece1
+    assert piece2 == piece2
+    assert piece3 == piece3
+    assert piece1 != piece3
+
+
+@parquet
+def test_partition_set_dictionary_type():
+    set1 = pq.PartitionSet('key1', [u('foo'), u('bar'), u('baz')])
+    set2 = pq.PartitionSet('key2', [2007, 2008, 2009])
+
+    assert isinstance(set1.dictionary, pa.StringArray)
+    assert isinstance(set2.dictionary, pa.IntegerArray)
+
+    set3 = pq.PartitionSet('key2', [datetime.datetime(2007, 1, 1)])
+    with pytest.raises(TypeError):
+        set3.dictionary
+
+
+@parquet
+def test_read_partitioned_directory(tmpdir):
+    foo_keys = [0, 1]
+    bar_keys = ['a', 'b', 'c']
+    partition_spec = [
+        ['foo', foo_keys],
+        ['bar', bar_keys]
+    ]
+    N = 30
+
+    df = pd.DataFrame({
+        'index': np.arange(N),
+        'foo': np.array(foo_keys, dtype='i4').repeat(15),
+        'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2),
+        'values': np.random.randn(N)
+    }, columns=['index', 'foo', 'bar', 'values'])
+
+    base_path = str(tmpdir)
+    _generate_partition_directories(base_path, partition_spec, df)
+
+    dataset = pq.ParquetDataset(base_path)
+    table = dataset.read()
+    result_df = (table.to_pandas()
+                 .sort_values(by='index')
+                 .reset_index(drop=True))
+
+    expected_df = (df.sort_values(by='index')
+                   .reset_index(drop=True)
+                   .reindex(columns=result_df.columns))
+    expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys)
+    expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys)
+
+    assert (result_df.columns == ['index', 'values', 'foo', 'bar']).all()
+
+    tm.assert_frame_equal(result_df, expected_df)
+
+
+def _generate_partition_directories(base_dir, partition_spec, df):
+    # partition_spec : list of lists, e.g. [['foo', [0, 1, 2],
+    #                                       ['bar', ['a', 'b', 'c']]
+    # part_table : a pyarrow.Table to write to each partition
+    DEPTH = len(partition_spec)
+    fs = LocalFilesystem.get_instance()
+
+    def _visit_level(base_dir, level, part_keys):
+        name, values = partition_spec[level]
+        for value in values:
+            this_part_keys = part_keys + [(name, value)]
+
+            level_dir = pjoin(base_dir, '{0}={1}'.format(name, value))
+            fs.mkdir(level_dir)
+
+            if level == DEPTH - 1:
+                # Generate example data
+                file_path = pjoin(level_dir, 'data.parq')
+
+                filtered_df = _filter_partition(df, this_part_keys)
+                part_table = pa.Table.from_pandas(filtered_df)
+                pq.write_table(part_table, file_path)
+            else:
+                _visit_level(level_dir, level + 1, this_part_keys)
+
+    _visit_level(base_dir, 0, [])
+
+
+def _filter_partition(df, part_keys):
+    predicate = np.ones(len(df), dtype=bool)
+
+    to_drop = []
+    for name, value in part_keys:
+        to_drop.append(name)
+        predicate &= df[name] == value
+
+    return df[predicate].drop(to_drop, axis=1)
 
 
 @parquet
@@ -459,7 +569,11 @@ def test_read_multiple_files(tmpdir):
     with open(pjoin(dirpath, '_SUCCESS.crc'), 'wb') as f:
         f.write(b'0')
 
-    result = pq.read_multiple_files(paths)
+    def read_multiple_files(paths, columns=None, nthreads=None, **kwargs):
+        dataset = pq.ParquetDataset(paths, **kwargs)
+        return dataset.read(columns=columns, nthreads=nthreads)
+
+    result = read_multiple_files(paths)
     expected = pa.concat_tables(test_data)
 
     assert result.equals(expected)
@@ -467,7 +581,7 @@ def test_read_multiple_files(tmpdir):
     # Read with provided metadata
     metadata = pq.ParquetFile(paths[0]).metadata
 
-    result2 = pq.read_multiple_files(paths, metadata=metadata)
+    result2 = read_multiple_files(paths, metadata=metadata)
     assert result2.equals(expected)
 
     result3 = pa.localfs.read_parquet(dirpath, schema=metadata.schema)
@@ -493,15 +607,15 @@ def test_read_multiple_files(tmpdir):
     bad_meta = pq.ParquetFile(bad_apple_path).metadata
 
     with pytest.raises(ValueError):
-        pq.read_multiple_files(paths + [bad_apple_path])
+        read_multiple_files(paths + [bad_apple_path])
 
     with pytest.raises(ValueError):
-        pq.read_multiple_files(paths, metadata=bad_meta)
+        read_multiple_files(paths, metadata=bad_meta)
 
     mixed_paths = [bad_apple_path, paths[0]]
 
     with pytest.raises(ValueError):
-        pq.read_multiple_files(mixed_paths, schema=bad_meta.schema)
+        read_multiple_files(mixed_paths, schema=bad_meta.schema)
 
     with pytest.raises(ValueError):
-        pq.read_multiple_files(mixed_paths)
+        read_multiple_files(mixed_paths)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index 548f478..79b4c15 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -39,6 +39,14 @@ class TestColumn(unittest.TestCase):
         assert column.shape == (5,)
         assert column.to_pylist() == [-10, -5, 0, 5, 10]
 
+    def test_from_array(self):
+        arr = pa.from_pylist([0, 1, 2, 3, 4])
+
+        col1 = pa.Column.from_array('foo', arr)
+        col2 = pa.Column.from_array(pa.field('foo', arr.type), arr)
+
+        assert col1.equals(col2)
+
     def test_pandas(self):
         data = [
             pa.from_pylist([-10, -5, 0, 5, 10])
@@ -169,6 +177,29 @@ def test_table_basics():
             assert chunk is not None
 
 
+def test_table_add_column():
+    data = [
+        pa.from_pylist(range(5)),
+        pa.from_pylist([-10, -5, 0, 5, 10]),
+        pa.from_pylist(range(5, 10))
+    ]
+    table = pa.Table.from_arrays(data, names=('a', 'b', 'c'))
+
+    col = pa.Column.from_array('d', data[1])
+    t2 = table.add_column(3, col)
+    t3 = table.append_column(col)
+
+    expected = pa.Table.from_arrays(data + [data[1]],
+                                    names=('a', 'b', 'c', 'd'))
+    assert t2.equals(expected)
+    assert t3.equals(expected)
+
+    t4 = table.add_column(0, col)
+    expected = pa.Table.from_arrays([data[1]] + data,
+                                    names=('d', 'a', 'b', 'c'))
+    assert t4.equals(expected)
+
+
 def test_table_remove_column():
     data = [
         pa.from_pylist(range(5)),