You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/23 11:41:43 UTC
arrow git commit: ARROW-475: [Python] Add support for reading
multiple Parquet files as a single pyarrow.Table
Repository: arrow
Updated Branches:
refs/heads/master 5a161ebc1 -> 53a478dfb
ARROW-475: [Python] Add support for reading multiple Parquet files as a single pyarrow.Table
Also fixes a serious bug in which the data source passed to the ParquetReader gets garbage collected prematurely
Also implements ARROW-470
Author: Wes McKinney <we...@twosigma.com>
Closes #296 from wesm/ARROW-475 and squashes the following commits:
894d2a2 [Wes McKinney] Implement Filesystem abstraction, add Filesystem.read_parquet. Implement rudimentary shim on local filesystem
3927c2c [Wes McKinney] Test read multiple Parquet from HDFS, fix premature garbage collection error
4904b3b [Wes McKinney] Implement read_multiple_files function for multiple Parquet files as a single Arrow table
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/53a478df
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/53a478df
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/53a478df
Branch: refs/heads/master
Commit: 53a478dfb278dcae5ca7f300b70857662553d118
Parents: 5a161eb
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Jan 23 06:41:35 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Jan 23 06:41:35 2017 -0500
----------------------------------------------------------------------
python/pyarrow/__init__.py | 6 +-
python/pyarrow/_parquet.pyx | 3 +
python/pyarrow/filesystem.py | 186 ++++++++++++++++++++++
python/pyarrow/includes/libarrow_io.pxd | 2 +
python/pyarrow/io.pyx | 62 +++-----
python/pyarrow/parquet.py | 88 ++++++++--
python/pyarrow/table.pyx | 60 ++++---
python/pyarrow/tests/test_column.py | 49 ------
python/pyarrow/tests/test_convert_builtin.py | 3 +-
python/pyarrow/tests/test_convert_pandas.py | 8 +-
python/pyarrow/tests/test_hdfs.py | 46 +++++-
python/pyarrow/tests/test_parquet.py | 155 +++++++++++++-----
python/pyarrow/tests/test_scalars.py | 2 +-
python/pyarrow/tests/test_schema.py | 1 -
python/pyarrow/tests/test_table.py | 50 ++++--
python/pyarrow/util.py | 25 +++
16 files changed, 568 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index efffbf2..d563c7a 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -42,7 +42,8 @@ from pyarrow.array import (Array,
from pyarrow.error import ArrowException
-from pyarrow.io import (HdfsClient, HdfsFile, NativeFile, PythonFileInterface,
+from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
+from pyarrow.io import (HdfsFile, NativeFile, PythonFileInterface,
Buffer, InMemoryOutputStream, BufferReader)
from pyarrow.scalar import (ArrayValue, Scalar, NA, NAType,
@@ -61,3 +62,6 @@ from pyarrow.schema import (null, bool_,
DataType, Field, Schema, schema)
from pyarrow.table import Column, RecordBatch, Table, concat_tables
+
+
+localfs = LocalFilesystem()
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 867fc4c..b11cee3 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -341,6 +341,7 @@ cdef logical_type_name_from_enum(ParquetLogicalType type_):
cdef class ParquetReader:
cdef:
+ object source
MemoryPool* allocator
unique_ptr[FileReader] reader
column_idx_map
@@ -360,6 +361,8 @@ cdef class ParquetReader:
if metadata is not None:
c_metadata = metadata.sp_metadata
+ self.source = source
+
get_reader(source, &rd_handle)
with nogil:
check_status(OpenFile(rd_handle, self.allocator, properties,
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/filesystem.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py
new file mode 100644
index 0000000..82409b7
--- /dev/null
+++ b/python/pyarrow/filesystem.py
@@ -0,0 +1,186 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from os.path import join as pjoin
+import os
+
+from pyarrow.util import implements
+import pyarrow.io as io
+
+
+class Filesystem(object):
+ """
+ Abstract filesystem interface
+ """
+ def ls(self, path):
+ """
+ Return list of file paths
+ """
+ raise NotImplementedError
+
+ def delete(self, path, recursive=False):
+ """
+ Delete the indicated file or directory
+
+ Parameters
+ ----------
+ path : string
+ recursive : boolean, default False
+ If True, also delete child paths for directories
+ """
+ raise NotImplementedError
+
+ def mkdir(self, path, create_parents=True):
+ raise NotImplementedError
+
+ def exists(self, path):
+ raise NotImplementedError
+
+ def isdir(self, path):
+ """
+ Return True if path is a directory
+ """
+ raise NotImplementedError
+
+ def isfile(self, path):
+ """
+ Return True if path is a file
+ """
+ raise NotImplementedError
+
+ def read_parquet(self, path, columns=None, schema=None):
+ """
+ Read Parquet data from path in file system. Can read from a single file
+ or a directory of files
+
+ Parameters
+ ----------
+ path : str
+ Single file path or directory
+ columns : List[str], optional
+ Subset of columns to read
+ schema : pyarrow.parquet.Schema
+ Known schema to validate files against
+
+ Returns
+ -------
+ table : pyarrow.Table
+ """
+ from pyarrow.parquet import read_multiple_files
+
+ if self.isdir(path):
+ paths_to_read = []
+ for path in self.ls(path):
+ if path == '_metadata' or path == '_common_metadata':
+ raise ValueError('No support yet for common metadata file')
+ paths_to_read.append(path)
+ else:
+ paths_to_read = [path]
+
+ return read_multiple_files(paths_to_read, columns=columns,
+ filesystem=self, schema=schema)
+
+
+class LocalFilesystem(Filesystem):
+
+ @implements(Filesystem.ls)
+ def ls(self, path):
+ return sorted(pjoin(path, x) for x in os.listdir(path))
+
+ @implements(Filesystem.isdir)
+ def isdir(self, path):
+ return os.path.isdir(path)
+
+ @implements(Filesystem.isfile)
+ def isfile(self, path):
+ return os.path.isfile(path)
+
+ @implements(Filesystem.exists)
+ def exists(self, path):
+ return os.path.exists(path)
+
+ def open(self, path, mode='rb'):
+ """
+ Open file for reading or writing
+ """
+ return open(path, mode=mode)
+
+
+class HdfsClient(io._HdfsClient, Filesystem):
+ """
+ Connect to an HDFS cluster. All parameters are optional and should
+ only be set if the defaults need to be overridden.
+
+ Authentication should be automatic if the HDFS cluster uses Kerberos.
+ However, if a username is specified, then the ticket cache will likely
+ be required.
+
+ Parameters
+ ----------
+ host : NameNode. Set to "default" for fs.defaultFS from core-site.xml.
+ port : NameNode's port. Set to 0 for default or logical (HA) nodes.
+ user : Username when connecting to HDFS; None implies login user.
+ kerb_ticket : Path to Kerberos ticket cache.
+ driver : {'libhdfs', 'libhdfs3'}, default 'libhdfs'
+ Connect using libhdfs (JNI-based) or libhdfs3 (3rd-party C++
+ library from Pivotal Labs)
+
+ Notes
+ -----
+ The first time you call this method, it will take longer than usual due
+ to JNI spin-up time.
+
+ Returns
+ -------
+ client : HDFSClient
+ """
+
+ def __init__(self, host="default", port=0, user=None, kerb_ticket=None,
+ driver='libhdfs'):
+ self._connect(host, port, user, kerb_ticket, driver)
+
+ @implements(Filesystem.isdir)
+ def isdir(self, path):
+ return io._HdfsClient.isdir(self, path)
+
+ @implements(Filesystem.isfile)
+ def isfile(self, path):
+ return io._HdfsClient.isfile(self, path)
+
+ @implements(Filesystem.delete)
+ def delete(self, path, recursive=False):
+ return io._HdfsClient.delete(self, path, recursive)
+
+ @implements(Filesystem.mkdir)
+ def mkdir(self, path, create_parents=True):
+ return io._HdfsClient.mkdir(self, path)
+
+ def ls(self, path, full_info=False):
+ """
+ Retrieve directory contents and metadata, if requested.
+
+ Parameters
+ ----------
+ path : HDFS path
+ full_info : boolean, default False
+ If False, only return list of paths
+
+ Returns
+ -------
+ result : list of dicts (full_info=True) or strings (full_info=False)
+ """
+ return io._HdfsClient.ls(self, path, full_info)
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/includes/libarrow_io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd
index 417af7d..3137938 100644
--- a/python/pyarrow/includes/libarrow_io.pxd
+++ b/python/pyarrow/includes/libarrow_io.pxd
@@ -148,6 +148,8 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
CStatus ListDirectory(const c_string& path,
vector[HdfsPathInfo]* listing)
+ CStatus GetPathInfo(const c_string& path, HdfsPathInfo* info)
+
CStatus Rename(const c_string& src, const c_string& dst)
CStatus OpenReadable(const c_string& path,
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 0f626f1..2621512 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -463,42 +463,17 @@ def strip_hdfs_abspath(path):
return path
-cdef class HdfsClient:
+cdef class _HdfsClient:
cdef:
shared_ptr[CHdfsClient] client
cdef readonly:
bint is_open
- def __cinit__(self, host="default", port=0, user=None, kerb_ticket=None,
- driver='libhdfs'):
- """
- Connect to an HDFS cluster. All parameters are optional and should
- only be set if the defaults need to be overridden.
-
- Authentication should be automatic if the HDFS cluster uses Kerberos.
- However, if a username is specified, then the ticket cache will likely
- be required.
+ def __cinit__(self):
+ pass
- Parameters
- ----------
- host : NameNode. Set to "default" for fs.defaultFS from core-site.xml.
- port : NameNode's port. Set to 0 for default or logical (HA) nodes.
- user : Username when connecting to HDFS; None implies login user.
- kerb_ticket : Path to Kerberos ticket cache.
- driver : {'libhdfs', 'libhdfs3'}, default 'libhdfs'
- Connect using libhdfs (JNI-based) or libhdfs3 (3rd-party C++
- library from Pivotal Labs)
-
- Notes
- -----
- The first time you call this method, it will take longer than usual due
- to JNI spin-up time.
-
- Returns
- -------
- client : HDFSClient
- """
+ def _connect(self, host, port, user, kerb_ticket, driver):
cdef HdfsConnectionConfig conf
if host is not None:
@@ -556,20 +531,25 @@ cdef class HdfsClient:
result = self.client.get().Exists(c_path)
return result
- def ls(self, path, bint full_info=True):
- """
- Retrieve directory contents and metadata, if requested.
+ def isdir(self, path):
+ cdef HdfsPathInfo info
+ self._path_info(path, &info)
+ return info.kind == ObjectType_DIRECTORY
- Parameters
- ----------
- path : HDFS path
- full_info : boolean, default True
- If False, only return list of paths
+ def isfile(self, path):
+ cdef HdfsPathInfo info
+ self._path_info(path, &info)
+ return info.kind == ObjectType_FILE
- Returns
- -------
- result : list of dicts (full_info=True) or strings (full_info=False)
- """
+ cdef _path_info(self, path, HdfsPathInfo* info):
+ cdef c_string c_path = tobytes(path)
+
+ with nogil:
+ check_status(self.client.get()
+ .GetPathInfo(c_path, info))
+
+
+ def ls(self, path, bint full_info):
cdef:
c_string c_path = tobytes(path)
vector[HdfsPathInfo] listing
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 2a1ac9d..cbe1c6e 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -15,8 +15,10 @@
# specific language governing permissions and limitations
# under the License.
-import pyarrow._parquet as _parquet
-from pyarrow.table import Table
+from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa
+ RowGroupMetaData, Schema, ParquetWriter)
+import pyarrow._parquet as _parquet # noqa
+from pyarrow.table import Table, concat_tables
class ParquetFile(object):
@@ -32,7 +34,7 @@ class ParquetFile(object):
Use existing metadata object, rather than reading from file.
"""
def __init__(self, source, metadata=None):
- self.reader = _parquet.ParquetReader()
+ self.reader = ParquetReader()
self.reader.open(source, metadata=metadata)
@property
@@ -67,10 +69,10 @@ class ParquetFile(object):
for column in columns]
arrays = [self.reader.read_column(column_idx)
for column_idx in column_idxs]
- return Table.from_arrays(columns, arrays)
+ return Table.from_arrays(arrays, names=columns)
-def read_table(source, columns=None):
+def read_table(source, columns=None, metadata=None):
"""
Read a Table from Parquet format
@@ -81,17 +83,79 @@ def read_table(source, columns=None):
pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader.
columns: list
If not None, only these columns will be read from the file.
+ metadata : FileMetaData
+ If separately computed
Returns
-------
- pyarrow.table.Table
+ pyarrow.Table
Content of the file as a table (of columns)
"""
- return ParquetFile(source).read(columns=columns)
+ return ParquetFile(source, metadata=metadata).read(columns=columns)
-def write_table(table, sink, chunk_size=None, version=None,
- use_dictionary=True, compression=None):
+def read_multiple_files(paths, columns=None, filesystem=None, metadata=None,
+ schema=None):
+ """
+ Read multiple Parquet files as a single pyarrow.Table
+
+ Parameters
+ ----------
+ paths : List[str]
+ List of file paths
+ columns : List[str]
+ Names of columns to read from the file
+ filesystem : Filesystem, default None
+ If nothing passed, paths assumed to be found in the local on-disk
+ filesystem
+ metadata : pyarrow.parquet.FileMetaData
+ Use metadata obtained elsewhere to validate file schemas
+ schema : pyarrow.parquet.Schema
+ Use schema obtained elsewhere to validate file schemas. Alternative to
+ metadata parameter
+
+ Returns
+ -------
+ pyarrow.Table
+ Content of the file as a table (of columns)
+ """
+ if filesystem is None:
+ def open_file(path, meta=None):
+ return ParquetFile(path, metadata=meta)
+ else:
+ def open_file(path, meta=None):
+ return ParquetFile(filesystem.open(path, mode='rb'), metadata=meta)
+
+ if len(paths) == 0:
+ raise ValueError('Must pass at least one file path')
+
+ if metadata is None and schema is None:
+ schema = open_file(paths[0]).schema
+ elif schema is None:
+ schema = metadata.schema
+
+ # Verify schemas are all equal
+ all_file_metadata = []
+ for path in paths:
+ file_metadata = open_file(path).metadata
+ if not schema.equals(file_metadata.schema):
+ raise ValueError('Schema in {0} was different. {1!s} vs {2!s}'
+ .format(path, file_metadata.schema, schema))
+ all_file_metadata.append(file_metadata)
+
+ # Read the tables
+ tables = []
+ for path, path_metadata in zip(paths, all_file_metadata):
+ reader = open_file(path, meta=path_metadata)
+ table = reader.read(columns=columns)
+ tables.append(table)
+
+ all_data = concat_tables(tables)
+ return all_data
+
+
+def write_table(table, sink, chunk_size=None, version='1.0',
+ use_dictionary=True, compression='snappy'):
"""
Write a Table to Parquet format
@@ -110,7 +174,7 @@ def write_table(table, sink, chunk_size=None, version=None,
compression : str or dict
Specify the compression codec, either on a general basis or per-column.
"""
- writer = _parquet.ParquetWriter(sink, use_dictionary=use_dictionary,
- compression=compression,
- version=version)
+ writer = ParquetWriter(sink, use_dictionary=use_dictionary,
+ compression=compression,
+ version=version)
writer.write_table(table, row_group_size=chunk_size)
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index 0e3b2bd..9242330 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -265,16 +265,35 @@ cdef class Column:
cdef _schema_from_arrays(arrays, names, shared_ptr[CSchema]* schema):
cdef:
Array arr
+ Column col
c_string c_name
vector[shared_ptr[CField]] fields
+ cdef shared_ptr[CDataType] type_
cdef int K = len(arrays)
fields.resize(K)
- for i in range(K):
- arr = arrays[i]
- c_name = tobytes(names[i])
- fields[i].reset(new CField(c_name, arr.type.sp_type, True))
+
+ if len(arrays) == 0:
+ raise ValueError('Must pass at least one array')
+
+ if isinstance(arrays[0], Array):
+ if names is None:
+ raise ValueError('Must pass names when constructing '
+ 'from Array objects')
+ for i in range(K):
+ arr = arrays[i]
+ type_ = arr.type.sp_type
+ c_name = tobytes(names[i])
+ fields[i].reset(new CField(c_name, type_, True))
+ elif isinstance(arrays[0], Column):
+ for i in range(K):
+ col = arrays[i]
+ type_ = col.sp_column.get().type()
+ c_name = tobytes(col.name)
+ fields[i].reset(new CField(c_name, type_, True))
+ else:
+ raise TypeError(type(arrays[0]))
schema.reset(new CSchema(fields))
@@ -429,19 +448,19 @@ cdef class RecordBatch:
pyarrow.table.RecordBatch
"""
names, arrays = _dataframe_to_arrays(df, None, False, schema)
- return cls.from_arrays(names, arrays)
+ return cls.from_arrays(arrays, names)
@staticmethod
- def from_arrays(names, arrays):
+ def from_arrays(arrays, names):
"""
Construct a RecordBatch from multiple pyarrow.Arrays
Parameters
----------
- names: list of str
- Labels for the columns
arrays: list of pyarrow.Array
column-wise data vectors
+ names: list of str
+ Labels for the columns
Returns
-------
@@ -594,20 +613,20 @@ cdef class Table:
names, arrays = _dataframe_to_arrays(df, name=name,
timestamps_to_ms=timestamps_to_ms,
schema=schema)
- return cls.from_arrays(names, arrays, name=name)
+ return cls.from_arrays(arrays, names=names, name=name)
@staticmethod
- def from_arrays(names, arrays, name=None):
+ def from_arrays(arrays, names=None, name=None):
"""
- Construct a Table from Arrow Arrays
+ Construct a Table from Arrow arrays or columns
Parameters
----------
-
- names: list of str
- Names for the table columns
- arrays: list of pyarrow.array.Array
+ arrays: list of pyarrow.Array or pyarrow.Column
Equal-length arrays that should form the table.
+ names: list of str, optional
+ Names for the table columns. If Columns passed, will be
+ inferred. If Arrays passed, this argument is required
name: str, optional
name for the Table
@@ -617,7 +636,6 @@ cdef class Table:
"""
cdef:
- Array arr
c_string c_name
vector[shared_ptr[CField]] fields
vector[shared_ptr[CColumn]] columns
@@ -628,9 +646,15 @@ cdef class Table:
cdef int K = len(arrays)
columns.resize(K)
+
for i in range(K):
- arr = arrays[i]
- columns[i].reset(new CColumn(schema.get().field(i), arr.sp_array))
+ if isinstance(arrays[i], Array):
+ columns[i].reset(new CColumn(schema.get().field(i),
+ (<Array> arrays[i]).sp_array))
+ elif isinstance(arrays[i], Column):
+ columns[i] = (<Column> arrays[i]).sp_column
+ else:
+ raise ValueError(type(arrays[i]))
if name is None:
c_name = ''
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_column.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_column.py b/python/pyarrow/tests/test_column.py
deleted file mode 100644
index 1a507c8..0000000
--- a/python/pyarrow/tests/test_column.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from pyarrow.compat import unittest
-import pyarrow as arrow
-
-A = arrow
-
-import pandas as pd
-
-
-class TestColumn(unittest.TestCase):
-
- def test_basics(self):
- data = [
- A.from_pylist([-10, -5, 0, 5, 10])
- ]
- table = A.Table.from_arrays(('a'), data, 'table_name')
- column = table.column(0)
- assert column.name == 'a'
- assert column.length() == 5
- assert len(column) == 5
- assert column.shape == (5,)
- assert column.to_pylist() == [-10, -5, 0, 5, 10]
-
- def test_pandas(self):
- data = [
- A.from_pylist([-10, -5, 0, 5, 10])
- ]
- table = A.Table.from_arrays(('a'), data, 'table_name')
- column = table.column(0)
- series = column.to_pandas()
- assert series.name == 'a'
- assert series.shape == (5,)
- assert series.iloc[0] == -10
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_convert_builtin.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py
index 72e4389..c06d18d 100644
--- a/python/pyarrow/tests/test_convert_builtin.py
+++ b/python/pyarrow/tests/test_convert_builtin.py
@@ -16,11 +16,12 @@
# specific language governing permissions and limitations
# under the License.
-from pyarrow.compat import unittest, u
+from pyarrow.compat import unittest, u # noqa
import pyarrow
import datetime
+
class TestConvertList(unittest.TestCase):
def test_boolean(self):
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index a2f5062..30705c4 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -74,7 +74,7 @@ class TestPandasConversion(unittest.TestCase):
tm.assert_frame_equal(result, expected)
def _check_array_roundtrip(self, values, expected=None,
- timestamps_to_ms=False, field=None):
+ timestamps_to_ms=False, field=None):
arr = A.Array.from_pandas(values, timestamps_to_ms=timestamps_to_ms,
field=field)
result = arr.to_pandas()
@@ -118,7 +118,7 @@ class TestPandasConversion(unittest.TestCase):
ex_frame = pd.DataFrame(dict(zip(names, expected_cols)),
columns=names)
- table = A.Table.from_arrays(names, arrays)
+ table = A.Table.from_arrays(arrays, names)
assert table.schema.equals(A.Schema.from_fields(fields))
result = table.to_pandas()
tm.assert_frame_equal(result, ex_frame)
@@ -169,7 +169,7 @@ class TestPandasConversion(unittest.TestCase):
ex_frame = pd.DataFrame(dict(zip(int_dtypes, expected_cols)),
columns=int_dtypes)
- table = A.Table.from_arrays(int_dtypes, arrays)
+ table = A.Table.from_arrays(arrays, int_dtypes)
result = table.to_pandas()
tm.assert_frame_equal(result, ex_frame)
@@ -201,7 +201,7 @@ class TestPandasConversion(unittest.TestCase):
schema = A.Schema.from_fields([field])
ex_frame = pd.DataFrame({'bools': expected})
- table = A.Table.from_arrays(['bools'], [arr])
+ table = A.Table.from_arrays([arr], ['bools'])
assert table.schema.equals(schema)
result = table.to_pandas()
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_hdfs.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py
index 2056f7a..cb24adb 100644
--- a/python/pyarrow/tests/test_hdfs.py
+++ b/python/pyarrow/tests/test_hdfs.py
@@ -21,9 +21,16 @@ import os
import random
import unittest
+import numpy as np
+import pandas.util.testing as pdt
import pytest
+from pyarrow.compat import guid
+from pyarrow.filesystem import HdfsClient
import pyarrow.io as io
+import pyarrow as pa
+
+import pyarrow.tests.test_parquet as test_parquet
# ----------------------------------------------------------------------
# HDFS tests
@@ -38,7 +45,7 @@ def hdfs_test_client(driver='libhdfs'):
raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
'an integer')
- return io.HdfsClient(host, port, user, driver=driver)
+ return HdfsClient(host, port, user, driver=driver)
class HdfsTestCases(object):
@@ -138,6 +145,43 @@ class HdfsTestCases(object):
assert result == data
+ @test_parquet.parquet
+ def test_hdfs_read_multiple_parquet_files(self):
+ import pyarrow.parquet as pq
+
+ nfiles = 10
+ size = 5
+
+ tmpdir = pjoin(self.tmp_path, 'multi-parquet-' + guid())
+
+ self.hdfs.mkdir(tmpdir)
+
+ test_data = []
+ paths = []
+ for i in range(nfiles):
+ df = test_parquet._test_dataframe(size, seed=i)
+
+ df['index'] = np.arange(i * size, (i + 1) * size)
+
+ # Hack so that we don't have a dtype cast in v1 files
+ df['uint32'] = df['uint32'].astype(np.int64)
+
+ path = pjoin(tmpdir, '{0}.parquet'.format(i))
+
+ table = pa.Table.from_pandas(df)
+ with self.hdfs.open(path, 'wb') as f:
+ pq.write_table(table, f)
+
+ test_data.append(table)
+ paths.append(path)
+
+ result = self.hdfs.read_parquet(tmpdir)
+ expected = pa.concat_tables(test_data)
+
+ pdt.assert_frame_equal(result.to_pandas()
+ .sort_values(by='index').reset_index(drop=True),
+ expected.to_pandas())
+
class TestLibHdfs(HdfsTestCases, unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 9cf860a..a94fe45 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -15,10 +15,13 @@
# specific language governing permissions and limitations
# under the License.
+from os.path import join as pjoin
import io
+import os
import pytest
-import pyarrow as A
+from pyarrow.compat import guid
+import pyarrow as pa
import pyarrow.io as paio
import numpy as np
@@ -42,9 +45,9 @@ def test_single_pylist_column_roundtrip(tmpdir):
for dtype in [int, float]:
filename = tmpdir.join('single_{}_column.parquet'
.format(dtype.__name__))
- data = [A.from_pylist(list(map(dtype, range(5))))]
- table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
- A.parquet.write_table(table, filename.strpath)
+ data = [pa.from_pylist(list(map(dtype, range(5))))]
+ table = pa.Table.from_arrays(data, names=('a', 'b'), name='table_name')
+ pq.write_table(table, filename.strpath)
table_read = pq.read_table(filename.strpath)
for col_written, col_read in zip(table.itercolumns(),
table_read.itercolumns()):
@@ -85,8 +88,8 @@ def test_pandas_parquet_2_0_rountrip(tmpdir):
df = alltypes_sample(size=10000)
filename = tmpdir.join('pandas_rountrip.parquet')
- arrow_table = A.Table.from_pandas(df, timestamps_to_ms=True)
- A.parquet.write_table(arrow_table, filename.strpath, version="2.0")
+ arrow_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
+ pq.write_table(arrow_table, filename.strpath, version="2.0")
table_read = pq.read_table(filename.strpath)
df_read = table_read.to_pandas()
pdt.assert_frame_equal(df, df_read)
@@ -113,8 +116,8 @@ def test_pandas_parquet_1_0_rountrip(tmpdir):
'empty_str': [''] * size
})
filename = tmpdir.join('pandas_rountrip.parquet')
- arrow_table = A.Table.from_pandas(df)
- A.parquet.write_table(arrow_table, filename.strpath, version="1.0")
+ arrow_table = pa.Table.from_pandas(df)
+ pq.write_table(arrow_table, filename.strpath, version="1.0")
table_read = pq.read_table(filename.strpath)
df_read = table_read.to_pandas()
@@ -133,28 +136,39 @@ def test_pandas_column_selection(tmpdir):
'uint16': np.arange(size, dtype=np.uint16)
})
filename = tmpdir.join('pandas_rountrip.parquet')
- arrow_table = A.Table.from_pandas(df)
- A.parquet.write_table(arrow_table, filename.strpath)
+ arrow_table = pa.Table.from_pandas(df)
+ pq.write_table(arrow_table, filename.strpath)
table_read = pq.read_table(filename.strpath, columns=['uint8'])
df_read = table_read.to_pandas()
pdt.assert_frame_equal(df[['uint8']], df_read)
-def _test_dataframe(size=10000):
- np.random.seed(0)
+def _random_integers(size, dtype):
+ # We do not generate integers outside the int64 range
+ i64_info = np.iinfo('int64')
+ iinfo = np.iinfo(dtype)
+ return np.random.randint(max(iinfo.min, i64_info.min),
+ min(iinfo.max, i64_info.max),
+ size=size).astype(dtype)
+
+
+def _test_dataframe(size=10000, seed=0):
+ np.random.seed(seed)
df = pd.DataFrame({
- 'uint8': np.arange(size, dtype=np.uint8),
- 'uint16': np.arange(size, dtype=np.uint16),
- 'uint32': np.arange(size, dtype=np.uint32),
- 'uint64': np.arange(size, dtype=np.uint64),
- 'int8': np.arange(size, dtype=np.int16),
- 'int16': np.arange(size, dtype=np.int16),
- 'int32': np.arange(size, dtype=np.int32),
- 'int64': np.arange(size, dtype=np.int64),
- 'float32': np.arange(size, dtype=np.float32),
+ 'uint8': _random_integers(size, np.uint8),
+ 'uint16': _random_integers(size, np.uint16),
+ 'uint32': _random_integers(size, np.uint32),
+ 'uint64': _random_integers(size, np.uint64),
+ 'int8': _random_integers(size, np.int8),
+ 'int16': _random_integers(size, np.int16),
+ 'int32': _random_integers(size, np.int32),
+ 'int64': _random_integers(size, np.int64),
+ 'float32': np.random.randn(size).astype(np.float32),
+ 'float64': np.random.randn(size),
'float64': np.arange(size, dtype=np.float64),
- 'bool': np.random.randn(size) > 0
+ 'bool': np.random.randn(size) > 0,
+ 'strings': [pdt.rands(10) for i in range(size)]
})
return df
@@ -162,7 +176,7 @@ def _test_dataframe(size=10000):
@parquet
def test_pandas_parquet_native_file_roundtrip(tmpdir):
df = _test_dataframe(10000)
- arrow_table = A.Table.from_pandas(df)
+ arrow_table = pa.Table.from_pandas(df)
imos = paio.InMemoryOutputStream()
pq.write_table(arrow_table, imos, version="2.0")
buf = imos.get_result()
@@ -183,10 +197,10 @@ def test_pandas_parquet_pyfile_roundtrip(tmpdir):
'strings': ['foo', 'bar', None, 'baz', 'qux']
})
- arrow_table = A.Table.from_pandas(df)
+ arrow_table = pa.Table.from_pandas(df)
with open(filename, 'wb') as f:
- A.parquet.write_table(arrow_table, f, version="1.0")
+ pq.write_table(arrow_table, f, version="1.0")
data = io.BytesIO(open(filename, 'rb').read())
@@ -213,31 +227,27 @@ def test_pandas_parquet_configuration_options(tmpdir):
'bool': np.random.randn(size) > 0
})
filename = tmpdir.join('pandas_rountrip.parquet')
- arrow_table = A.Table.from_pandas(df)
+ arrow_table = pa.Table.from_pandas(df)
for use_dictionary in [True, False]:
- A.parquet.write_table(
- arrow_table,
- filename.strpath,
- version="2.0",
- use_dictionary=use_dictionary)
+ pq.write_table(arrow_table, filename.strpath,
+ version="2.0",
+ use_dictionary=use_dictionary)
table_read = pq.read_table(filename.strpath)
df_read = table_read.to_pandas()
pdt.assert_frame_equal(df, df_read)
for compression in ['NONE', 'SNAPPY', 'GZIP']:
- A.parquet.write_table(
- arrow_table,
- filename.strpath,
- version="2.0",
- compression=compression)
+ pq.write_table(arrow_table, filename.strpath,
+ version="2.0",
+ compression=compression)
table_read = pq.read_table(filename.strpath)
df_read = table_read.to_pandas()
pdt.assert_frame_equal(df, df_read)
def make_sample_file(df):
- a_table = A.Table.from_pandas(df, timestamps_to_ms=True)
+ a_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
buf = io.BytesIO()
pq.write_table(a_table, buf, compression='SNAPPY', version='2.0')
@@ -315,7 +325,7 @@ def test_pass_separate_metadata():
# ARROW-471
df = alltypes_sample(size=10000)
- a_table = A.Table.from_pandas(df, timestamps_to_ms=True)
+ a_table = pa.Table.from_pandas(df, timestamps_to_ms=True)
buf = io.BytesIO()
pq.write_table(a_table, buf, compression='snappy', version='2.0')
@@ -328,3 +338,72 @@ def test_pass_separate_metadata():
fileh = pq.ParquetFile(buf, metadata=metadata)
pdt.assert_frame_equal(df, fileh.read().to_pandas())
+
+
+@parquet
+def test_read_multiple_files(tmpdir):
+ nfiles = 10
+ size = 5
+
+ dirpath = tmpdir.join(guid()).strpath
+ os.mkdir(dirpath)
+
+ test_data = []
+ paths = []
+ for i in range(nfiles):
+ df = _test_dataframe(size, seed=i)
+
+ # Hack so that we don't have a dtype cast in v1 files
+ df['uint32'] = df['uint32'].astype(np.int64)
+
+ path = pjoin(dirpath, '{0}.parquet'.format(i))
+
+ table = pa.Table.from_pandas(df)
+ pq.write_table(table, path)
+
+ test_data.append(table)
+ paths.append(path)
+
+ result = pq.read_multiple_files(paths)
+ expected = pa.concat_tables(test_data)
+
+ assert result.equals(expected)
+
+ # Read with provided metadata
+ metadata = pq.ParquetFile(paths[0]).metadata
+
+ result2 = pq.read_multiple_files(paths, metadata=metadata)
+ assert result2.equals(expected)
+
+ result3 = pa.localfs.read_parquet(dirpath, schema=metadata.schema)
+ assert result3.equals(expected)
+
+ # Read column subset
+ to_read = [result[0], result[3], result[6]]
+ result = pa.localfs.read_parquet(
+ dirpath, columns=[c.name for c in to_read])
+ expected = pa.Table.from_arrays(to_read)
+ assert result.equals(expected)
+
+ # Test failure modes with non-uniform metadata
+ bad_apple = _test_dataframe(size, seed=i).iloc[:, :4]
+ bad_apple_path = tmpdir.join('{0}.parquet'.format(guid())).strpath
+
+ t = pa.Table.from_pandas(bad_apple)
+ pq.write_table(t, bad_apple_path)
+
+ bad_meta = pq.ParquetFile(bad_apple_path).metadata
+
+ with pytest.raises(ValueError):
+ pq.read_multiple_files(paths + [bad_apple_path])
+
+ with pytest.raises(ValueError):
+ pq.read_multiple_files(paths, metadata=bad_meta)
+
+ mixed_paths = [bad_apple_path, paths[0]]
+
+ with pytest.raises(ValueError):
+ pq.read_multiple_files(mixed_paths, schema=bad_meta.schema)
+
+ with pytest.raises(ValueError):
+ pq.read_multiple_files(mixed_paths)
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_scalars.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_scalars.py b/python/pyarrow/tests/test_scalars.py
index 62e51f8..ef600a0 100644
--- a/python/pyarrow/tests/test_scalars.py
+++ b/python/pyarrow/tests/test_scalars.py
@@ -32,7 +32,7 @@ class TestScalars(unittest.TestCase):
v = arr[0]
assert isinstance(v, A.BooleanValue)
assert repr(v) == "True"
- assert v.as_py() == True
+ assert v.as_py() is True
assert arr[1] is A.NA
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_schema.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_schema.py b/python/pyarrow/tests/test_schema.py
index 4aa8112..507ebb8 100644
--- a/python/pyarrow/tests/test_schema.py
+++ b/python/pyarrow/tests/test_schema.py
@@ -85,4 +85,3 @@ baz: list<item: int8>"""
del fields[-1]
sch3 = A.schema(fields)
assert not sch1.equals(sch3)
-
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index 6f00c73..d49b33c 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -21,16 +21,43 @@ from pandas.util.testing import assert_frame_equal
import pandas as pd
import pytest
+from pyarrow.compat import unittest
import pyarrow as pa
+class TestColumn(unittest.TestCase):
+
+ def test_basics(self):
+ data = [
+ pa.from_pylist([-10, -5, 0, 5, 10])
+ ]
+ table = pa.Table.from_arrays(data, names=['a'], name='table_name')
+ column = table.column(0)
+ assert column.name == 'a'
+ assert column.length() == 5
+ assert len(column) == 5
+ assert column.shape == (5,)
+ assert column.to_pylist() == [-10, -5, 0, 5, 10]
+
+ def test_pandas(self):
+ data = [
+ pa.from_pylist([-10, -5, 0, 5, 10])
+ ]
+ table = pa.Table.from_arrays(data, names=['a'], name='table_name')
+ column = table.column(0)
+ series = column.to_pandas()
+ assert series.name == 'a'
+ assert series.shape == (5,)
+ assert series.iloc[0] == -10
+
+
def test_recordbatch_basics():
data = [
pa.from_pylist(range(5)),
pa.from_pylist([-10, -5, 0, 5, 10])
]
- batch = pa.RecordBatch.from_arrays(['c0', 'c1'], data)
+ batch = pa.RecordBatch.from_arrays(data, ['c0', 'c1'])
assert len(batch) == 5
assert batch.num_rows == 5
@@ -95,7 +122,7 @@ def test_table_basics():
pa.from_pylist(range(5)),
pa.from_pylist([-10, -5, 0, 5, 10])
]
- table = pa.Table.from_arrays(('a', 'b'), data, 'table_name')
+ table = pa.Table.from_arrays(data, names=('a', 'b'), name='table_name')
assert table.name == 'table_name'
assert len(table) == 5
assert table.num_rows == 5
@@ -121,19 +148,19 @@ def test_concat_tables():
[1., 2., 3., 4., 5.]
]
- t1 = pa.Table.from_arrays(('a', 'b'), [pa.from_pylist(x)
- for x in data], 'table_name')
- t2 = pa.Table.from_arrays(('a', 'b'), [pa.from_pylist(x)
- for x in data2], 'table_name')
+ t1 = pa.Table.from_arrays([pa.from_pylist(x) for x in data],
+ names=('a', 'b'), name='table_name')
+ t2 = pa.Table.from_arrays([pa.from_pylist(x) for x in data2],
+ names=('a', 'b'), name='table_name')
result = pa.concat_tables([t1, t2], output_name='foo')
assert result.name == 'foo'
assert len(result) == 10
- expected = pa.Table.from_arrays(
- ('a', 'b'), [pa.from_pylist(x + y)
- for x, y in zip(data, data2)],
- 'foo')
+ expected = pa.Table.from_arrays([pa.from_pylist(x + y)
+ for x, y in zip(data, data2)],
+ names=('a', 'b'),
+ name='foo')
assert result.equals(expected)
@@ -143,7 +170,8 @@ def test_table_pandas():
pa.from_pylist(range(5)),
pa.from_pylist([-10, -5, 0, 5, 10])
]
- table = pa.Table.from_arrays(('a', 'b'), data, 'table_name')
+ table = pa.Table.from_arrays(data, names=('a', 'b'),
+ name='table_name')
# TODO: Use this part once from_pandas is implemented
# data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]}
http://git-wip-us.apache.org/repos/asf/arrow/blob/53a478df/python/pyarrow/util.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/util.py b/python/pyarrow/util.py
new file mode 100644
index 0000000..4b6a835
--- /dev/null
+++ b/python/pyarrow/util.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Miscellaneous utility code
+
+
+def implements(f):
+ def decorator(g):
+ g.__doc__ = f.__doc__
+ return g
+ return decorator