You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/12 17:05:27 UTC
arrow git commit: ARROW-539: [Python] Add support for reading
partitioned Parquet files with Hive-like directory schemes
Repository: arrow
Updated Branches:
refs/heads/master 9db96fea4 -> 9d532c49d
ARROW-539: [Python] Add support for reading partitioned Parquet files with Hive-like directory schemes
I probably didn't get all the use cases, but this should be a good start.
First, the directory structure is walked to determine the distinct partition keys. These keys are later used as the dictionary for `arrow::DictionaryArray` objects which are constructed.
I also created the `ParquetDatasetPiece` class to enable distributed processing of file components in frameworks like Dask. We may need to address pickling of the `ParquetPartitions` object (which must be passed to `ParquetDatasetPiece.read` so the right array metadata can be constructed.
Author: Wes McKinney <we...@twosigma.com>
Author: Miki Tebeka <mi...@gmail.com>
Closes #529 from wesm/ARROW-539 and squashes the following commits:
a0451fa [Wes McKinney] Code review comments
deb6d82 [Wes McKinney] Don't make file-like Python object on LocalFilesystem
04dc691 [Wes McKinney] Complete initial partitioned reads, supporting unit tests. Expose arrow::Table::AddColumn
7d33755 [Wes McKinney] Untested draft of ParquetManifest for partitioned directory structures. Get test suite passing again
ba8825f [Wes McKinney] Prototyping
18fe639 [Wes McKinney] Refactoring, add ParquetDataset, ParquetDatasetPiece
016b445 [Miki Tebeka] [ARROW-539] [Python] Support reading Parquet datasets with standard partition directory schemes
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/9d532c49
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/9d532c49
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/9d532c49
Branch: refs/heads/master
Commit: 9d532c49d563ec22f73af3cc49549eb2e5cb6898
Parents: 9db96fe
Author: Wes McKinney <we...@twosigma.com>
Authored: Wed Apr 12 13:05:21 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Wed Apr 12 13:05:21 2017 -0400
----------------------------------------------------------------------
python/pyarrow/filesystem.py | 25 +-
python/pyarrow/includes/libarrow.pxd | 2 +
python/pyarrow/parquet.py | 547 ++++++++++++++++++++++++++----
python/pyarrow/table.pxd | 1 +
python/pyarrow/table.pyx | 40 ++-
python/pyarrow/tests/test_parquet.py | 156 +++++++--
python/pyarrow/tests/test_table.py | 31 ++
7 files changed, 692 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/filesystem.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py
index e820806..269cf1c 100644
--- a/python/pyarrow/filesystem.py
+++ b/python/pyarrow/filesystem.py
@@ -87,20 +87,10 @@ class Filesystem(object):
-------
table : pyarrow.Table
"""
- from pyarrow.parquet import read_multiple_files
-
- if self.isdir(path):
- paths_to_read = []
- for path in self.ls(path):
- if path.endswith('parq') or path.endswith('parquet'):
- paths_to_read.append(path)
- else:
- paths_to_read = [path]
-
- return read_multiple_files(paths_to_read, columns=columns,
- filesystem=self, schema=schema,
- metadata=metadata,
- nthreads=nthreads)
+ from pyarrow.parquet import ParquetDataset
+ dataset = ParquetDataset(path, schema=schema, metadata=metadata,
+ filesystem=self)
+ return dataset.read(columns=columns, nthreads=nthreads)
class LocalFilesystem(Filesystem):
@@ -117,6 +107,13 @@ class LocalFilesystem(Filesystem):
def ls(self, path):
return sorted(pjoin(path, x) for x in os.listdir(path))
+ @implements(Filesystem.mkdir)
+ def mkdir(self, path, create_parents=True):
+ if create_parents:
+ os.makedirs(path)
+ else:
+ os.mkdir(path)
+
@implements(Filesystem.isdir)
def isdir(self, path):
return os.path.isdir(path)
http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 40dd837..ae2b45f 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -291,6 +291,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CSchema] schema()
shared_ptr[CColumn] column(int i)
+ CStatus AddColumn(int i, const shared_ptr[CColumn]& column,
+ shared_ptr[CTable]* out)
CStatus RemoveColumn(int i, shared_ptr[CTable]* out)
cdef cppclass CTensor" arrow::Tensor":
http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index d95c3b3..f81b6c2 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -17,18 +17,23 @@
import six
+import numpy as np
+
+from pyarrow.filesystem import LocalFilesystem
from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa
RowGroupMetaData, Schema, ParquetWriter)
import pyarrow._parquet as _parquet # noqa
-from pyarrow.table import concat_tables
+import pyarrow.array as _array
+import pyarrow.table as _table
-EXCLUDED_PARQUET_PATHS = {'_metadata', '_common_metadata', '_SUCCESS'}
+# ----------------------------------------------------------------------
+# Reading a single Parquet file
class ParquetFile(object):
"""
- Open a Parquet binary file for reading
+ Reader interface for a single Parquet file
Parameters
----------
@@ -72,7 +77,8 @@ class ParquetFile(object):
Content of the row group as a table (of columns)
"""
column_indices = self._get_column_indices(columns)
- self.reader.set_num_threads(nthreads)
+ if nthreads is not None:
+ self.reader.set_num_threads(nthreads)
return self.reader.read_row_group(i, column_indices=column_indices)
def read(self, columns=None, nthreads=1):
@@ -93,7 +99,8 @@ class ParquetFile(object):
Content of the file as a table (of columns)
"""
column_indices = self._get_column_indices(columns)
- self.reader.set_num_threads(nthreads)
+ if nthreads is not None:
+ self.reader.set_num_threads(nthreads)
return self.reader.read_all(column_indices=column_indices)
def _get_column_indices(self, column_names):
@@ -104,6 +111,463 @@ class ParquetFile(object):
for column in column_names]
+# ----------------------------------------------------------------------
+# Metadata container providing instructions about reading a single Parquet
+# file, possibly part of a partitioned dataset
+
+
+class ParquetDatasetPiece(object):
+ """
+ A single chunk of a potentially larger Parquet dataset to read. The
+ arguments will indicate to read either a single row group or all row
+ groups, and whether to add partition keys to the resulting pyarrow.Table
+
+ Parameters
+ ----------
+ path : str
+ Path to file in the file system where this piece is located
+ partition_keys : list of tuples
+ [(column name, ordinal index)]
+ row_group : int, default None
+ Row group to load. By default, reads all row groups
+ """
+
+ def __init__(self, path, row_group=None, partition_keys=None):
+ self.path = path
+ self.row_group = row_group
+ self.partition_keys = partition_keys or []
+
+ def __eq__(self, other):
+ if not isinstance(other, ParquetDatasetPiece):
+ return False
+ return (self.path == other.path and
+ self.row_group == other.row_group and
+ self.partition_keys == other.partition_keys)
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __repr__(self):
+ return ('{0}({1!r}, row_group={2!r}, partition_keys={3!r})'
+ .format(type(self).__name__, self.path,
+ self.row_group,
+ self.partition_keys))
+
+ def __str__(self):
+ result = ''
+
+ if len(self.partition_keys) > 0:
+ partition_str = ', '.join('{0}={1}'.format(name, index)
+ for name, index in self.partition_keys)
+ result += 'partition[{0}] '.format(partition_str)
+
+ result += self.path
+
+ if self.row_group is not None:
+ result += ' | row_group={0}'.format(self.row_group)
+
+ return result
+
+ def get_metadata(self, open_file_func=None):
+ """
+ Given a function that can create an open ParquetFile object, return the
+ file's metadata
+ """
+ return self._open(open_file_func).metadata
+
+ def _open(self, open_file_func=None):
+ """
+ Returns instance of ParquetFile
+ """
+ if open_file_func is None:
+ def simple_opener(path):
+ return ParquetFile(path)
+ open_file_func = simple_opener
+ return open_file_func(self.path)
+
+ def read(self, columns=None, nthreads=1, partitions=None,
+ open_file_func=None):
+ """
+ Read this piece as a pyarrow.Table
+
+ Parameters
+ ----------
+ columns : list of column names, default None
+ nthreads : int, default 1
+ For multithreaded file reads
+ partitions : ParquetPartitions, default None
+ open_file_func : function, default None
+ A function that knows how to construct a ParquetFile object given
+ the file path in this piece
+
+ Returns
+ -------
+ table : pyarrow.Table
+ """
+ reader = self._open(open_file_func)
+
+ if self.row_group is not None:
+ table = reader.read_row_group(self.row_group, columns=columns,
+ nthreads=nthreads)
+ else:
+ table = reader.read(columns=columns, nthreads=nthreads)
+
+ if len(self.partition_keys) > 0:
+ if partitions is None:
+ raise ValueError('Must pass partition sets')
+
+ # Here, the index is the categorical code of the partition where
+ # this piece is located. Suppose we had
+ #
+ # /foo=a/0.parq
+ # /foo=b/0.parq
+ # /foo=c/0.parq
+ #
+ # Then we assign a=0, b=1, c=2. And the resulting Table pieces will
+ # have a DictionaryArray column named foo having the constant index
+ # value as indicated. The distinct categories of the partition have
+ # been computed in the ParquetManifest
+ for i, (name, index) in enumerate(self.partition_keys):
+ # The partition code is the same for all values in this piece
+ indices = np.array([index], dtype='i4').repeat(len(table))
+
+ # This is set of all partition values, computed as part of the
+ # manifest, so ['a', 'b', 'c'] as in our example above.
+ dictionary = partitions.levels[i].dictionary
+
+ arr = _array.DictionaryArray.from_arrays(indices, dictionary)
+ col = _table.Column.from_array(name, arr)
+ table = table.append_column(col)
+
+ return table
+
+
+def _is_parquet_file(path):
+ return path.endswith('parq') or path.endswith('parquet')
+
+
+class PartitionSet(object):
+ """A data structure for cataloguing the observed Parquet partitions at a
+ particular level. So if we have
+
+ /foo=a/bar=0
+ /foo=a/bar=1
+ /foo=a/bar=2
+ /foo=b/bar=0
+ /foo=b/bar=1
+ /foo=b/bar=2
+
+ Then we have two partition sets, one for foo, another for bar. As we visit
+ levels of the partition hierarchy, a PartitionSet tracks the distinct
+ values and assigns categorical codes to use when reading the pieces
+ """
+
+ def __init__(self, name, keys=None):
+ self.name = name
+ self.keys = keys or []
+ self.key_indices = {k: i for i, k in enumerate(self.keys)}
+ self._dictionary = None
+
+ def get_index(self, key):
+ """
+ Get the index of the partition value if it is known, otherwise assign
+ one
+ """
+ if key in self.key_indices:
+ return self.key_indices[key]
+ else:
+ index = len(self.key_indices)
+ self.keys.append(key)
+ self.key_indices[key] = index
+ return index
+
+ @property
+ def dictionary(self):
+ if self._dictionary is not None:
+ return self._dictionary
+
+ if len(self.keys) == 0:
+ raise ValueError('No known partition keys')
+
+ # Only integer and string partition types are supported right now
+ try:
+ integer_keys = [int(x) for x in self.keys]
+ dictionary = _array.from_pylist(integer_keys)
+ except ValueError:
+ dictionary = _array.from_pylist(self.keys)
+
+ self._dictionary = dictionary
+ return dictionary
+
+ @property
+ def is_sorted(self):
+ return list(self.keys) == sorted(self.keys)
+
+
+class ParquetPartitions(object):
+
+ def __init__(self):
+ self.levels = []
+ self.partition_names = set()
+
+ def __len__(self):
+ return len(self.levels)
+
+ def __getitem__(self, i):
+ return self.levels[i]
+
+ def get_index(self, level, name, key):
+ """
+ Record a partition value at a particular level, returning the distinct
+ code for that value at that level. Example:
+
+ partitions.get_index(1, 'foo', 'a') returns 0
+ partitions.get_index(1, 'foo', 'b') returns 1
+ partitions.get_index(1, 'foo', 'c') returns 2
+ partitions.get_index(1, 'foo', 'a') returns 0
+
+ Parameters
+ ----------
+ level : int
+ The nesting level of the partition we are observing
+ name : string
+ The partition name
+ key : string or int
+ The partition value
+ """
+ if level == len(self.levels):
+ if name in self.partition_names:
+ raise ValueError('{0} was the name of the partition in '
+ 'another level'.format(name))
+
+ part_set = PartitionSet(name)
+ self.levels.append(part_set)
+ self.partition_names.add(name)
+
+ return self.levels[level].get_index(key)
+
+
+def is_string(x):
+ return isinstance(x, six.string_types)
+
+
+class ParquetManifest(object):
+ """
+
+ """
+ def __init__(self, dirpath, filesystem=None, pathsep='/',
+ partition_scheme='hive'):
+ self.filesystem = filesystem or LocalFilesystem.get_instance()
+ self.pathsep = pathsep
+ self.dirpath = dirpath
+ self.partition_scheme = partition_scheme
+ self.partitions = ParquetPartitions()
+ self.pieces = []
+
+ self.common_metadata_path = None
+ self.metadata_path = None
+
+ self._visit_level(0, self.dirpath, [])
+
+ def _visit_level(self, level, base_path, part_keys):
+ directories = []
+ files = []
+ fs = self.filesystem
+
+ if not fs.isdir(base_path):
+ raise ValueError('"{0}" is not a directory'.format(base_path))
+
+ for path in sorted(fs.ls(base_path)):
+ if fs.isfile(path):
+ if _is_parquet_file(path):
+ files.append(path)
+ elif path.endswith('_common_metadata'):
+ self.common_metadata_path = path
+ elif path.endswith('_metadata'):
+ self.metadata_path = path
+ elif not self._should_silently_exclude(path):
+ print('Ignoring path: {0}'.format(path))
+ elif fs.isdir(path):
+ directories.append(path)
+
+ if len(files) > 0 and len(directories) > 0:
+ raise ValueError('Found files in an intermediate '
+ 'directory: {0}'.format(base_path))
+ elif len(directories) > 0:
+ self._visit_directories(level, directories, part_keys)
+ else:
+ self._push_pieces(files, part_keys)
+
+ def _should_silently_exclude(self, path):
+ _, tail = path.rsplit(self.pathsep, 1)
+ return tail.endswith('.crc') or tail in EXCLUDED_PARQUET_PATHS
+
+ def _visit_directories(self, level, directories, part_keys):
+ for path in directories:
+ head, tail = _path_split(path, self.pathsep)
+ name, key = _parse_hive_partition(tail)
+
+ index = self.partitions.get_index(level, name, key)
+ dir_part_keys = part_keys + [(name, index)]
+ self._visit_level(level + 1, path, dir_part_keys)
+
+ def _parse_partition(self, dirname):
+ if self.partition_scheme == 'hive':
+ return _parse_hive_partition(dirname)
+ else:
+ raise NotImplementedError('partition schema: {0}'
+ .format(self.partition_scheme))
+
+ def _push_pieces(self, files, part_keys):
+ self.pieces.extend([
+ ParquetDatasetPiece(path, partition_keys=part_keys)
+ for path in files
+ ])
+
+
+def _parse_hive_partition(value):
+ if '=' not in value:
+ raise ValueError('Directory name did not appear to be a '
+ 'partition: {0}'.format(value))
+ return value.split('=', 1)
+
+
+def _path_split(path, sep):
+ i = path.rfind(sep) + 1
+ head, tail = path[:i], path[i:]
+ head = head.rstrip(sep)
+ return head, tail
+
+
+EXCLUDED_PARQUET_PATHS = {'_SUCCESS'}
+
+
+class ParquetDataset(object):
+ """
+ Encapsulates details of reading a complete Parquet dataset possibly
+ consisting of multiple files and partitions in subdirectories
+
+ Parameters
+ ----------
+ path_or_paths : str or List[str]
+ A directory name, single file name, or list of file names
+ filesystem : Filesystem, default None
+ If nothing passed, paths assumed to be found in the local on-disk
+ filesystem
+ metadata : pyarrow.parquet.FileMetaData
+ Use metadata obtained elsewhere to validate file schemas
+ schema : pyarrow.parquet.Schema
+ Use schema obtained elsewhere to validate file schemas. Alternative to
+ metadata parameter
+ split_row_groups : boolean, default False
+ Divide files into pieces for each row group in the file
+ validate_schema : boolean, default True
+ Check that individual file schemas are all the same / compatible
+ """
+ def __init__(self, path_or_paths, filesystem=None, schema=None,
+ metadata=None, split_row_groups=False, validate_schema=True):
+ if filesystem is None:
+ self.fs = LocalFilesystem.get_instance()
+ else:
+ self.fs = filesystem
+
+ self.pieces, self.partitions = _make_manifest(path_or_paths, self.fs)
+
+ self.metadata = metadata
+ self.schema = schema
+
+ self.split_row_groups = split_row_groups
+
+ if split_row_groups:
+ raise NotImplementedError("split_row_groups not yet implemented")
+
+ if validate_schema:
+ self.validate_schemas()
+
+ def validate_schemas(self):
+ open_file = self._get_open_file_func()
+
+ if self.metadata is None and self.schema is None:
+ self.schema = self.pieces[0].get_metadata(open_file).schema
+ elif self.schema is None:
+ self.schema = self.metadata.schema
+
+ # Verify schemas are all equal
+ for piece in self.pieces:
+ file_metadata = piece.get_metadata(open_file)
+ if not self.schema.equals(file_metadata.schema):
+ raise ValueError('Schema in {0!s} was different. '
+ '{1!s} vs {2!s}'
+ .format(piece, file_metadata.schema,
+ self.schema))
+
+ def read(self, columns=None, nthreads=1):
+ """
+ Read multiple Parquet files as a single pyarrow.Table
+
+ Parameters
+ ----------
+ columns : List[str]
+ Names of columns to read from the file
+ nthreads : int, default 1
+ Number of columns to read in parallel. Requires that the underlying
+ file source is threadsafe
+
+ Returns
+ -------
+ pyarrow.Table
+ Content of the file as a table (of columns)
+ """
+ open_file = self._get_open_file_func()
+
+ tables = []
+ for piece in self.pieces:
+ table = piece.read(columns=columns, nthreads=nthreads,
+ partitions=self.partitions,
+ open_file_func=open_file)
+ tables.append(table)
+
+ all_data = _table.concat_tables(tables)
+ return all_data
+
+ def _get_open_file_func(self):
+ if self.fs is None or isinstance(self.fs, LocalFilesystem):
+ def open_file(path, meta=None):
+ return ParquetFile(path, metadata=meta)
+ else:
+ def open_file(path, meta=None):
+ return ParquetFile(self.fs.open(path, mode='rb'),
+ metadata=meta)
+ return open_file
+
+
+def _make_manifest(path_or_paths, fs, pathsep='/'):
+ partitions = None
+
+ if is_string(path_or_paths) and fs.isdir(path_or_paths):
+ manifest = ParquetManifest(path_or_paths, filesystem=fs,
+ pathsep=pathsep)
+ pieces = manifest.pieces
+ partitions = manifest.partitions
+ else:
+ if not isinstance(path_or_paths, list):
+ path_or_paths = [path_or_paths]
+
+ # List of paths
+ if len(path_or_paths) == 0:
+ raise ValueError('Must pass at least one file path')
+
+ pieces = []
+ for path in path_or_paths:
+ if not fs.isfile(path):
+ raise IOError('Passed non-file path: {0}'
+ .format(path))
+ piece = ParquetDatasetPiece(path)
+ pieces.append(piece)
+
+ return pieces, partitions
+
+
def read_table(source, columns=None, nthreads=1, metadata=None):
"""
Read a Table from Parquet format
@@ -127,9 +591,7 @@ def read_table(source, columns=None, nthreads=1, metadata=None):
pyarrow.Table
Content of the file as a table (of columns)
"""
- from pyarrow.filesystem import LocalFilesystem
-
- if isinstance(source, six.string_types):
+ if is_string(source):
fs = LocalFilesystem.get_instance()
if fs.isdir(source):
return fs.read_parquet(source, columns=columns,
@@ -139,70 +601,7 @@ def read_table(source, columns=None, nthreads=1, metadata=None):
return pf.read(columns=columns, nthreads=nthreads)
-def read_multiple_files(paths, columns=None, filesystem=None, nthreads=1,
- metadata=None, schema=None):
- """
- Read multiple Parquet files as a single pyarrow.Table
-
- Parameters
- ----------
- paths : List[str]
- List of file paths
- columns : List[str]
- Names of columns to read from the file
- filesystem : Filesystem, default None
- If nothing passed, paths assumed to be found in the local on-disk
- filesystem
- nthreads : int, default 1
- Number of columns to read in parallel. Requires that the underlying
- file source is threadsafe
- metadata : pyarrow.parquet.FileMetaData
- Use metadata obtained elsewhere to validate file schemas
- schema : pyarrow.parquet.Schema
- Use schema obtained elsewhere to validate file schemas. Alternative to
- metadata parameter
-
- Returns
- -------
- pyarrow.Table
- Content of the file as a table (of columns)
- """
- if filesystem is None:
- def open_file(path, meta=None):
- return ParquetFile(path, metadata=meta)
- else:
- def open_file(path, meta=None):
- return ParquetFile(filesystem.open(path, mode='rb'), metadata=meta)
-
- if len(paths) == 0:
- raise ValueError('Must pass at least one file path')
-
- if metadata is None and schema is None:
- schema = open_file(paths[0]).schema
- elif schema is None:
- schema = metadata.schema
-
- # Verify schemas are all equal
- all_file_metadata = []
- for path in paths:
- file_metadata = open_file(path).metadata
- if not schema.equals(file_metadata.schema):
- raise ValueError('Schema in {0} was different. {1!s} vs {2!s}'
- .format(path, file_metadata.schema, schema))
- all_file_metadata.append(file_metadata)
-
- # Read the tables
- tables = []
- for path, path_metadata in zip(paths, all_file_metadata):
- reader = open_file(path, meta=path_metadata)
- table = reader.read(columns=columns, nthreads=nthreads)
- tables.append(table)
-
- all_data = concat_tables(tables)
- return all_data
-
-
-def write_table(table, sink, row_group_size=None, version='1.0',
+def write_table(table, where, row_group_size=None, version='1.0',
use_dictionary=True, compression='snappy', **kwargs):
"""
Write a Table to Parquet format
@@ -210,7 +609,7 @@ def write_table(table, sink, row_group_size=None, version='1.0',
Parameters
----------
table : pyarrow.Table
- sink: string or pyarrow.io.NativeFile
+ where: string or pyarrow.io.NativeFile
row_group_size : int, default None
The maximum number of rows in each Parquet RowGroup. As a default,
we will write a single RowGroup per file.
@@ -223,7 +622,7 @@ def write_table(table, sink, row_group_size=None, version='1.0',
Specify the compression codec, either on a general basis or per-column.
"""
row_group_size = kwargs.get('chunk_size', row_group_size)
- writer = ParquetWriter(sink, use_dictionary=use_dictionary,
+ writer = ParquetWriter(where, use_dictionary=use_dictionary,
compression=compression,
version=version)
writer.write_table(table, row_group_size=row_group_size)
http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/table.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxd b/python/pyarrow/table.pxd
index 389727b..f564042 100644
--- a/python/pyarrow/table.pxd
+++ b/python/pyarrow/table.pxd
@@ -58,5 +58,6 @@ cdef class RecordBatch:
cdef init(self, const shared_ptr[CRecordBatch]& table)
cdef _check_nullptr(self)
+cdef object box_column(const shared_ptr[CColumn]& ccolumn)
cdef api object table_from_ctable(const shared_ptr[CTable]& ctable)
cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch)
http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index 94389a7..3972bda 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -30,8 +30,9 @@ import pyarrow.config
from pyarrow.array cimport Array, box_array, wrap_array_output
from pyarrow.error import ArrowException
from pyarrow.error cimport check_status
-from pyarrow.schema cimport box_data_type, box_schema, DataType
+from pyarrow.schema cimport box_data_type, box_schema, DataType, Field
+from pyarrow.schema import field
from pyarrow.compat import frombytes, tobytes
cimport cpython
@@ -141,6 +142,19 @@ cdef class Column:
self.sp_column = column
self.column = column.get()
+ @staticmethod
+ def from_array(object field_or_name, Array arr):
+ cdef Field boxed_field
+
+ if isinstance(field_or_name, Field):
+ boxed_field = field_or_name
+ else:
+ boxed_field = field(field_or_name, arr.type)
+
+ cdef shared_ptr[CColumn] sp_column
+ sp_column.reset(new CColumn(boxed_field.sp_field, arr.sp_array))
+ return box_column(sp_column)
+
def to_pandas(self):
"""
Convert the arrow::Column to a pandas.Series
@@ -828,6 +842,24 @@ cdef class Table:
"""
return (self.num_rows, self.num_columns)
+ def add_column(self, int i, Column column):
+ """
+ Add column to Table at position. Returns new table
+ """
+ cdef:
+ shared_ptr[CTable] c_table
+
+ with nogil:
+ check_status(self.table.AddColumn(i, column.sp_column, &c_table))
+
+ return table_from_ctable(c_table)
+
+ def append_column(self, Column column):
+ """
+ Append column at end of columns. Returns new table
+ """
+ return self.add_column(self.num_columns, column)
+
def remove_column(self, int i):
"""
Create new Table with the indicated column removed
@@ -865,6 +897,12 @@ def concat_tables(tables):
return table_from_ctable(c_result)
+cdef object box_column(const shared_ptr[CColumn]& ccolumn):
+ cdef Column column = Column()
+ column.init(ccolumn)
+ return column
+
+
cdef api object table_from_ctable(const shared_ptr[CTable]& ctable):
cdef Table table = Table()
table.init(ctable)
http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 86165be..de1b148 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -16,11 +16,13 @@
# under the License.
from os.path import join as pjoin
+import datetime
import io
import os
import pytest
-from pyarrow.compat import guid
+from pyarrow.compat import guid, u
+from pyarrow.filesystem import LocalFilesystem
import pyarrow as pa
import pyarrow.io as paio
from .pandas_examples import dataframe_with_arrays, dataframe_with_lists
@@ -28,7 +30,7 @@ from .pandas_examples import dataframe_with_arrays, dataframe_with_lists
import numpy as np
import pandas as pd
-import pandas.util.testing as pdt
+import pandas.util.testing as tm
try:
import pyarrow.parquet as pq
@@ -93,7 +95,7 @@ def test_pandas_parquet_2_0_rountrip(tmpdir):
pq.write_table(arrow_table, filename.strpath, version="2.0")
table_read = pq.read_table(filename.strpath)
df_read = table_read.to_pandas()
- pdt.assert_frame_equal(df, df_read)
+ tm.assert_frame_equal(df, df_read)
@parquet
@@ -125,7 +127,7 @@ def test_pandas_parquet_1_0_rountrip(tmpdir):
# We pass uint32_t as int64_t if we write Parquet version 1.0
df['uint32'] = df['uint32'].values.astype(np.int64)
- pdt.assert_frame_equal(df, df_read)
+ tm.assert_frame_equal(df, df_read)
@parquet
@@ -142,7 +144,7 @@ def test_pandas_column_selection(tmpdir):
table_read = pq.read_table(filename.strpath, columns=['uint8'])
df_read = table_read.to_pandas()
- pdt.assert_frame_equal(df[['uint8']], df_read)
+ tm.assert_frame_equal(df[['uint8']], df_read)
def _random_integers(size, dtype):
@@ -169,7 +171,7 @@ def _test_dataframe(size=10000, seed=0):
'float64': np.random.randn(size),
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0,
- 'strings': [pdt.rands(10) for i in range(size)]
+ 'strings': [tm.rands(10) for i in range(size)]
})
return df
@@ -183,7 +185,7 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir):
buf = imos.get_result()
reader = paio.BufferReader(buf)
df_read = pq.read_table(reader).to_pandas()
- pdt.assert_frame_equal(df, df_read)
+ tm.assert_frame_equal(df, df_read)
@parquet
@@ -207,7 +209,7 @@ def test_pandas_parquet_pyfile_roundtrip(tmpdir):
table_read = pq.read_table(data)
df_read = table_read.to_pandas()
- pdt.assert_frame_equal(df, df_read)
+ tm.assert_frame_equal(df, df_read)
@parquet
@@ -236,7 +238,7 @@ def test_pandas_parquet_configuration_options(tmpdir):
use_dictionary=use_dictionary)
table_read = pq.read_table(filename.strpath)
df_read = table_read.to_pandas()
- pdt.assert_frame_equal(df, df_read)
+ tm.assert_frame_equal(df, df_read)
for compression in ['NONE', 'SNAPPY', 'GZIP']:
pq.write_table(arrow_table, filename.strpath,
@@ -244,7 +246,7 @@ def test_pandas_parquet_configuration_options(tmpdir):
compression=compression)
table_read = pq.read_table(filename.strpath)
df_read = table_read.to_pandas()
- pdt.assert_frame_equal(df, df_read)
+ tm.assert_frame_equal(df, df_read)
def make_sample_file(df):
@@ -331,7 +333,7 @@ def test_column_of_arrays(tmpdir):
pq.write_table(arrow_table, filename.strpath, version="2.0")
table_read = pq.read_table(filename.strpath)
df_read = table_read.to_pandas()
- pdt.assert_frame_equal(df, df_read)
+ tm.assert_frame_equal(df, df_read)
@parquet
@@ -344,7 +346,7 @@ def test_column_of_lists(tmpdir):
pq.write_table(arrow_table, filename.strpath, version="2.0")
table_read = pq.read_table(filename.strpath)
df_read = table_read.to_pandas()
- pdt.assert_frame_equal(df, df_read)
+ tm.assert_frame_equal(df, df_read)
@parquet
@@ -399,7 +401,7 @@ def test_pass_separate_metadata():
fileh = pq.ParquetFile(buf, metadata=metadata)
- pdt.assert_frame_equal(df, fileh.read().to_pandas())
+ tm.assert_frame_equal(df, fileh.read().to_pandas())
@parquet
@@ -422,13 +424,121 @@ def test_read_single_row_group():
row_groups = [pf.read_row_group(i) for i in range(K)]
result = pa.concat_tables(row_groups)
- pdt.assert_frame_equal(df, result.to_pandas())
+ tm.assert_frame_equal(df, result.to_pandas())
cols = df.columns[:2]
row_groups = [pf.read_row_group(i, columns=cols)
for i in range(K)]
result = pa.concat_tables(row_groups)
- pdt.assert_frame_equal(df[cols], result.to_pandas())
+ tm.assert_frame_equal(df[cols], result.to_pandas())
+
+
+@parquet
+def test_parquet_piece_basics():
+ path = '/baz.parq'
+
+ piece1 = pq.ParquetDatasetPiece(path)
+ piece2 = pq.ParquetDatasetPiece(path, row_group=1)
+ piece3 = pq.ParquetDatasetPiece(
+ path, row_group=1, partition_keys=[('foo', 0), ('bar', 1)])
+
+ assert str(piece1) == path
+ assert str(piece2) == '/baz.parq | row_group=1'
+ assert str(piece3) == 'partition[foo=0, bar=1] /baz.parq | row_group=1'
+
+ assert piece1 == piece1
+ assert piece2 == piece2
+ assert piece3 == piece3
+ assert piece1 != piece3
+
+
+@parquet
+def test_partition_set_dictionary_type():
+ set1 = pq.PartitionSet('key1', [u('foo'), u('bar'), u('baz')])
+ set2 = pq.PartitionSet('key2', [2007, 2008, 2009])
+
+ assert isinstance(set1.dictionary, pa.StringArray)
+ assert isinstance(set2.dictionary, pa.IntegerArray)
+
+ set3 = pq.PartitionSet('key2', [datetime.datetime(2007, 1, 1)])
+ with pytest.raises(TypeError):
+ set3.dictionary
+
+
+@parquet
+def test_read_partitioned_directory(tmpdir):
+ foo_keys = [0, 1]
+ bar_keys = ['a', 'b', 'c']
+ partition_spec = [
+ ['foo', foo_keys],
+ ['bar', bar_keys]
+ ]
+ N = 30
+
+ df = pd.DataFrame({
+ 'index': np.arange(N),
+ 'foo': np.array(foo_keys, dtype='i4').repeat(15),
+ 'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2),
+ 'values': np.random.randn(N)
+ }, columns=['index', 'foo', 'bar', 'values'])
+
+ base_path = str(tmpdir)
+ _generate_partition_directories(base_path, partition_spec, df)
+
+ dataset = pq.ParquetDataset(base_path)
+ table = dataset.read()
+ result_df = (table.to_pandas()
+ .sort_values(by='index')
+ .reset_index(drop=True))
+
+ expected_df = (df.sort_values(by='index')
+ .reset_index(drop=True)
+ .reindex(columns=result_df.columns))
+ expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys)
+ expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys)
+
+ assert (result_df.columns == ['index', 'values', 'foo', 'bar']).all()
+
+ tm.assert_frame_equal(result_df, expected_df)
+
+
+def _generate_partition_directories(base_dir, partition_spec, df):
+ # partition_spec : list of lists, e.g. [['foo', [0, 1, 2],
+ # ['bar', ['a', 'b', 'c']]
+ # part_table : a pyarrow.Table to write to each partition
+ DEPTH = len(partition_spec)
+ fs = LocalFilesystem.get_instance()
+
+ def _visit_level(base_dir, level, part_keys):
+ name, values = partition_spec[level]
+ for value in values:
+ this_part_keys = part_keys + [(name, value)]
+
+ level_dir = pjoin(base_dir, '{0}={1}'.format(name, value))
+ fs.mkdir(level_dir)
+
+ if level == DEPTH - 1:
+ # Generate example data
+ file_path = pjoin(level_dir, 'data.parq')
+
+ filtered_df = _filter_partition(df, this_part_keys)
+ part_table = pa.Table.from_pandas(filtered_df)
+ pq.write_table(part_table, file_path)
+ else:
+ _visit_level(level_dir, level + 1, this_part_keys)
+
+ _visit_level(base_dir, 0, [])
+
+
+def _filter_partition(df, part_keys):
+ predicate = np.ones(len(df), dtype=bool)
+
+ to_drop = []
+ for name, value in part_keys:
+ to_drop.append(name)
+ predicate &= df[name] == value
+
+ return df[predicate].drop(to_drop, axis=1)
@parquet
@@ -459,7 +569,11 @@ def test_read_multiple_files(tmpdir):
with open(pjoin(dirpath, '_SUCCESS.crc'), 'wb') as f:
f.write(b'0')
- result = pq.read_multiple_files(paths)
+ def read_multiple_files(paths, columns=None, nthreads=None, **kwargs):
+ dataset = pq.ParquetDataset(paths, **kwargs)
+ return dataset.read(columns=columns, nthreads=nthreads)
+
+ result = read_multiple_files(paths)
expected = pa.concat_tables(test_data)
assert result.equals(expected)
@@ -467,7 +581,7 @@ def test_read_multiple_files(tmpdir):
# Read with provided metadata
metadata = pq.ParquetFile(paths[0]).metadata
- result2 = pq.read_multiple_files(paths, metadata=metadata)
+ result2 = read_multiple_files(paths, metadata=metadata)
assert result2.equals(expected)
result3 = pa.localfs.read_parquet(dirpath, schema=metadata.schema)
@@ -493,15 +607,15 @@ def test_read_multiple_files(tmpdir):
bad_meta = pq.ParquetFile(bad_apple_path).metadata
with pytest.raises(ValueError):
- pq.read_multiple_files(paths + [bad_apple_path])
+ read_multiple_files(paths + [bad_apple_path])
with pytest.raises(ValueError):
- pq.read_multiple_files(paths, metadata=bad_meta)
+ read_multiple_files(paths, metadata=bad_meta)
mixed_paths = [bad_apple_path, paths[0]]
with pytest.raises(ValueError):
- pq.read_multiple_files(mixed_paths, schema=bad_meta.schema)
+ read_multiple_files(mixed_paths, schema=bad_meta.schema)
with pytest.raises(ValueError):
- pq.read_multiple_files(mixed_paths)
+ read_multiple_files(mixed_paths)
http://git-wip-us.apache.org/repos/asf/arrow/blob/9d532c49/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index 548f478..79b4c15 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -39,6 +39,14 @@ class TestColumn(unittest.TestCase):
assert column.shape == (5,)
assert column.to_pylist() == [-10, -5, 0, 5, 10]
+ def test_from_array(self):
+ arr = pa.from_pylist([0, 1, 2, 3, 4])
+
+ col1 = pa.Column.from_array('foo', arr)
+ col2 = pa.Column.from_array(pa.field('foo', arr.type), arr)
+
+ assert col1.equals(col2)
+
def test_pandas(self):
data = [
pa.from_pylist([-10, -5, 0, 5, 10])
@@ -169,6 +177,29 @@ def test_table_basics():
assert chunk is not None
+def test_table_add_column():
+ data = [
+ pa.from_pylist(range(5)),
+ pa.from_pylist([-10, -5, 0, 5, 10]),
+ pa.from_pylist(range(5, 10))
+ ]
+ table = pa.Table.from_arrays(data, names=('a', 'b', 'c'))
+
+ col = pa.Column.from_array('d', data[1])
+ t2 = table.add_column(3, col)
+ t3 = table.append_column(col)
+
+ expected = pa.Table.from_arrays(data + [data[1]],
+ names=('a', 'b', 'c', 'd'))
+ assert t2.equals(expected)
+ assert t3.equals(expected)
+
+ t4 = table.add_column(0, col)
+ expected = pa.Table.from_arrays([data[1]] + data,
+ names=('d', 'a', 'b', 'c'))
+ assert t4.equals(expected)
+
+
def test_table_remove_column():
data = [
pa.from_pylist(range(5)),