You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/07/23 16:32:25 UTC
[arrow] branch master updated: ARROW-2891: [Python] Preserve schema
in write_to_dataset
This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 0ecf88d ARROW-2891: [Python] Preserve schema in write_to_dataset
0ecf88d is described below
commit 0ecf88de502dcc9b1c65c7fc353627fbdb8ea6e4
Author: johnkulzick <jk...@civisanalytics.com>
AuthorDate: Mon Jul 23 18:32:15 2018 +0200
ARROW-2891: [Python] Preserve schema in write_to_dataset
Author: johnkulzick <jk...@civisanalytics.com>
Closes #2302 from jkulzick/ARROW-2891-preserve-schema and squashes the following commits:
09ff40c6 <johnkulzick> remove copy of schema
83d00486 <johnkulzick> remove unused copy import from test_parquet
0bd54c05 <johnkulzick> preserve schema when writing dataset with partition columns
---
python/pyarrow/parquet.py | 9 ++++++++-
python/pyarrow/tests/test_parquet.py | 23 ++++++++++++++++++++---
2 files changed, 28 insertions(+), 4 deletions(-)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index f97c871..85cec67 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -1137,6 +1137,12 @@ def write_to_dataset(table, root_path, partition_cols=None,
data_cols = df.columns.drop(partition_cols)
if len(data_cols) == 0:
raise ValueError("No data left to save outside partition columns")
+ subschema = table.schema
+ # ARROW-2891: Ensure the output_schema is preserved when writing a
+ # partitioned dataset
+ for partition_col in partition_cols:
+ subschema = subschema.remove(
+ subschema.get_field_index(partition_col))
for keys, subgroup in data_df.groupby(partition_keys):
if not isinstance(keys, tuple):
keys = (keys,)
@@ -1144,7 +1150,8 @@ def write_to_dataset(table, root_path, partition_cols=None,
["{colname}={value}".format(colname=name, value=val)
for name, val in zip(partition_cols, keys)])
subtable = Table.from_pandas(subgroup,
- preserve_index=preserve_index)
+ preserve_index=preserve_index,
+ schema=subschema)
prefix = "/".join([root_path, subdir])
_mkdir_if_not_exists(fs, prefix)
outfile = compat.guid() + ".parquet"
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index fec65b9..d6ca7dd 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1731,18 +1731,21 @@ def test_read_table_doesnt_warn():
assert len(record) == 0
-def _test_write_to_dataset_with_partitions(base_path, filesystem=None):
+def _test_write_to_dataset_with_partitions(base_path,
+ filesystem=None,
+ schema=None):
# ARROW-1400
import pyarrow.parquet as pq
output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
'group2': list('eefeffgeee'),
'num': list(range(10)),
+ 'nan': [pd.np.nan] * 10,
'date': np.arange('2017-01-01', '2017-01-11',
dtype='datetime64[D]')})
cols = output_df.columns.tolist()
partition_by = ['group1', 'group2']
- output_table = pa.Table.from_pandas(output_df)
+ output_table = pa.Table.from_pandas(output_df, schema=schema)
pq.write_to_dataset(output_table, base_path, partition_by,
filesystem=filesystem)
@@ -1754,7 +1757,11 @@ def _test_write_to_dataset_with_partitions(base_path, filesystem=None):
else:
pq.write_metadata(output_table.schema, metadata_path)
- dataset = pq.ParquetDataset(base_path, filesystem=filesystem)
+ # ARROW-2891: Ensure the output_schema is preserved when writing a
+ # partitioned dataset
+ dataset = pq.ParquetDataset(base_path,
+ filesystem=filesystem,
+ validate_schema=True)
# ARROW-2209: Ensure the dataset schema also includes the partition columns
dataset_cols = set(dataset.schema.to_arrow_schema().names)
assert dataset_cols == set(output_table.schema.names)
@@ -1814,6 +1821,16 @@ def test_write_to_dataset_with_partitions(tmpdir):
@parquet
+def test_write_to_dataset_with_partitions_and_schema(tmpdir):
+ schema = pa.schema([pa.field('group1', type=pa.string()),
+ pa.field('group2', type=pa.string()),
+ pa.field('num', type=pa.int64()),
+ pa.field('nan', type=pa.int32()),
+ pa.field('date', type=pa.timestamp(unit='us'))])
+ _test_write_to_dataset_with_partitions(str(tmpdir), schema=schema)
+
+
+@parquet
def test_write_to_dataset_no_partitions(tmpdir):
_test_write_to_dataset_no_partitions(str(tmpdir))