You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/07/23 16:32:25 UTC

[arrow] branch master updated: ARROW-2891: [Python] Preserve schema in write_to_dataset

This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ecf88d  ARROW-2891: [Python] Preserve schema in write_to_dataset
0ecf88d is described below

commit 0ecf88de502dcc9b1c65c7fc353627fbdb8ea6e4
Author: johnkulzick <jk...@civisanalytics.com>
AuthorDate: Mon Jul 23 18:32:15 2018 +0200

    ARROW-2891: [Python] Preserve schema in write_to_dataset
    
    Author: johnkulzick <jk...@civisanalytics.com>
    
    Closes #2302 from jkulzick/ARROW-2891-preserve-schema and squashes the following commits:
    
    09ff40c6 <johnkulzick> remove copy of schema
    83d00486 <johnkulzick> remove unused copy import from test_parquet
    0bd54c05 <johnkulzick> preserve schema when writing dataset with partition columns
---
 python/pyarrow/parquet.py            |  9 ++++++++-
 python/pyarrow/tests/test_parquet.py | 23 ++++++++++++++++++++---
 2 files changed, 28 insertions(+), 4 deletions(-)

diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index f97c871..85cec67 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -1137,6 +1137,12 @@ def write_to_dataset(table, root_path, partition_cols=None,
         data_cols = df.columns.drop(partition_cols)
         if len(data_cols) == 0:
             raise ValueError("No data left to save outside partition columns")
+        subschema = table.schema
+        # ARROW-2891: Ensure the output_schema is preserved when writing a
+        # partitioned dataset
+        for partition_col in partition_cols:
+            subschema = subschema.remove(
+                subschema.get_field_index(partition_col))
         for keys, subgroup in data_df.groupby(partition_keys):
             if not isinstance(keys, tuple):
                 keys = (keys,)
@@ -1144,7 +1150,8 @@ def write_to_dataset(table, root_path, partition_cols=None,
                 ["{colname}={value}".format(colname=name, value=val)
                  for name, val in zip(partition_cols, keys)])
             subtable = Table.from_pandas(subgroup,
-                                         preserve_index=preserve_index)
+                                         preserve_index=preserve_index,
+                                         schema=subschema)
             prefix = "/".join([root_path, subdir])
             _mkdir_if_not_exists(fs, prefix)
             outfile = compat.guid() + ".parquet"
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index fec65b9..d6ca7dd 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1731,18 +1731,21 @@ def test_read_table_doesnt_warn():
     assert len(record) == 0
 
 
-def _test_write_to_dataset_with_partitions(base_path, filesystem=None):
+def _test_write_to_dataset_with_partitions(base_path,
+                                           filesystem=None,
+                                           schema=None):
     # ARROW-1400
     import pyarrow.parquet as pq
 
     output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
                               'group2': list('eefeffgeee'),
                               'num': list(range(10)),
+                              'nan': [pd.np.nan] * 10,
                               'date': np.arange('2017-01-01', '2017-01-11',
                                                 dtype='datetime64[D]')})
     cols = output_df.columns.tolist()
     partition_by = ['group1', 'group2']
-    output_table = pa.Table.from_pandas(output_df)
+    output_table = pa.Table.from_pandas(output_df, schema=schema)
     pq.write_to_dataset(output_table, base_path, partition_by,
                         filesystem=filesystem)
 
@@ -1754,7 +1757,11 @@ def _test_write_to_dataset_with_partitions(base_path, filesystem=None):
     else:
         pq.write_metadata(output_table.schema, metadata_path)
 
-    dataset = pq.ParquetDataset(base_path, filesystem=filesystem)
+    # ARROW-2891: Ensure the output_schema is preserved when writing a
+    # partitioned dataset
+    dataset = pq.ParquetDataset(base_path,
+                                filesystem=filesystem,
+                                validate_schema=True)
     # ARROW-2209: Ensure the dataset schema also includes the partition columns
     dataset_cols = set(dataset.schema.to_arrow_schema().names)
     assert dataset_cols == set(output_table.schema.names)
@@ -1814,6 +1821,16 @@ def test_write_to_dataset_with_partitions(tmpdir):
 
 
 @parquet
+def test_write_to_dataset_with_partitions_and_schema(tmpdir):
+    schema = pa.schema([pa.field('group1', type=pa.string()),
+                        pa.field('group2', type=pa.string()),
+                        pa.field('num', type=pa.int64()),
+                        pa.field('nan', type=pa.int32()),
+                        pa.field('date', type=pa.timestamp(unit='us'))])
+    _test_write_to_dataset_with_partitions(str(tmpdir), schema=schema)
+
+
+@parquet
 def test_write_to_dataset_no_partitions(tmpdir):
     _test_write_to_dataset_no_partitions(str(tmpdir))