You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Antonino Ingargiola <tr...@gmail.com> on 2022/01/24 12:39:26 UTC

[Python] add a new column to a table during dataset consolidation

Hi list,

I am looking for a way to add a new column to an existing table that is
computed as the sum/mean of other columns.  From the docs, I understand
that pyarrow compute functions operate on arrays (i.e. columns) but I
cannot find if it is possible to aggregate through columns in some way.

In addition, I use a scanner API to load and re-save a parquet dataset for
consolidating the small files in each partition, following the example from
the docs[1]. But, now I would like to extend the consolidation step adding
new columns before saving the new dataset.

Here is an example of what I want to achieve. In this case I am using
pandas and scanner.to_batches():

import pyarrow.dataset as ds
import pyarrow as pa
from glob import glob
from uuid import uuid4

# Create the input dataset
data_dict = {'partition': [1, 1, 2, 2],
'a': [1, 2, 3, 4],
'b': [2, 4, 6, 8]}
table = pa.Table.from_pydict(data_dict)

in_arrow_path = 'example_input'
out_arrow_path = 'example_output'

ds.write_dataset(table, in_arrow_path, format='parquet',
partitioning=['partition'],
partitioning_flavor='hive',
existing_data_behavior='delete_matching',
basename_template=f'{uuid4()}-{{i}}')

print('\n'.join(glob(f'{in_arrow_path}/**/*')))
dataset = ds.dataset(in_arrow_path, partitioning='hive')
print(dataset.to_table().to_pandas())

# Re-save the input dataset adding a new column ("consolidation")
scanner = dataset.scanner()
cols = ['a', 'b']

for batch in scanner.to_batches():
df = batch.to_pandas()
df['mean'] = df[cols].mean(axis=1)
new_batch = pa.RecordBatch.from_pandas(df)
ds.write_dataset(
new_batch, out_arrow_path,
format='parquet',
partitioning=['partition'],
partitioning_flavor='hive',
existing_data_behavior='delete_matching',
basename_template=f'{uuid4()}-{{i}}.parquet'
)

print('\n'.join(glob(f'{out_arrow_path}/**/*')))
new_dataset = ds.dataset(out_arrow_path, partitioning='hive')
print(new_dataset.to_table().to_pandas())

So the questions are:

1. Can I add the new column (mean) directly in pyarrow?
2. Do I need to write a loop batch by batch as above or is there a way to
apply the transformation through the scanner API like in [1]?

Thanks for any advice.

Antonio

[1]
https://arrow.apache.org/docs/python/dataset.html#writing-large-amounts-of-data

Re: [Python] add a new column to a table during dataset consolidation

Posted by Antonino Ingargiola <tr...@gmail.com>.
@Niranda Perera thanks for the clarification

@Weston Pace <we...@gmail.com> thanks for the detailed answer, each
bit was very informative!

Indeed, as you mentioned, wrapping the scanner object is necessary
otherwise ds.write_dataset will not perform any consolidation, so my first
example transforms but does not consolidate (eg if the input has several
parquet files in a partition they **will not** be saved as a single file in
the output).

I would like to implement a mechanism so I can pass arbitrary
transformations, similar to what you mention regarding the future python
UDF support.

Right now, since I need to transform both the batch and the schema I put
both transformations in a single object and come up with the following
working example based on what you suggested (code appended below). To
implement a new column-wise function the user has to implement a method
taking a pandas df and returning a series, the object will do the rest.

Now, this workaround to tie together batch and schema modifications with an
ad-hoc object works. But, ideally, I would like to create a new modified
scanner object that will perform the transformation under the hood and will
expose the proper scanner API. In this way ds.write_dataset will not need
the additional schema argument. It would be nice to modify a scanner object
through a decorator. Would this be possible?

Also, there will be any potential issues when the dataset is saved to a
cloud fs such as S3?

Best,
Antonio

Example [1]
```
from abc import ABC, abstractmethod
import shutil
from typing import Optional
import pyarrow.dataset as ds
import pyarrow as pa
import pyarrow.parquet as pq
from glob import glob
from uuid import uuid4
import pandas as pd


def file_visitor(written_file):
    print(f"Saved partitioned file file_path={str(written_file.path)}")
    # table = pq.read_table(written_file.path)
    parquet_file = pq.ParquetFile(written_file.path)
    print(f'{parquet_file.metadata=}')
    print(f'{parquet_file.schema=}')


def append_column_to_batch(batch: pa.RecordBatch, arr: pa.Array, name: str)
-> pa.RecordBatch:
    """
    Utility function to append a column to a RecordBatch.

    This is useful until pa.RecordBatches does add an `.append_column`
method
    """
    # NOTE: It's a metadata-only (zero-copy) operation so shouldn't take
much time
    tab = pa.Table.from_batches([batch])
    field = pa.field(name, arr.type)
    new_tab = tab.append_column(field, arr)
    return new_tab.to_batches()[0]


in_arrow_path = 'example_input'
out_arrow_path = 'example_output'


# Create the input dataset
data_dict1 = {'partition': [1, 1, 2, 2],
              'a': [1, 2, 3, 4],
              'b': [2, 4, 6, 8],
              'c': [10, 11, 12, 13]}
data_dict2 = {'partition': [1, 1, 2, 2],
              'a': [5, 6, 7, 8],
              'b': [10, 12, 14, 16],
              'c': [20, 21, 22, 23]}
table1 = pa.Table.from_pydict(data_dict1)
table2 = pa.Table.from_pydict(data_dict2)


ds.write_dataset(table1, in_arrow_path, format='parquet',
                 partitioning=['partition'],
                 partitioning_flavor='hive',
                 existing_data_behavior='delete_matching',
                 basename_template=f'{uuid4()}-{{i}}.parquet')
ds.write_dataset(table2, in_arrow_path, format='parquet',
                 partitioning=['partition'],
                 partitioning_flavor='hive',
                 existing_data_behavior='overwrite_or_ignore',
                 basename_template=f'{uuid4()}-{{i}}.parquet')
print('\n'.join(glob(f'{in_arrow_path}/**/*')))
dataset = ds.dataset(in_arrow_path, partitioning='hive')
print(dataset.to_table().to_pandas())

shutil.rmtree(out_arrow_path, ignore_errors=True)


# Re-save the input dataset adding a new column ("consolidation")
scanner = dataset.scanner()


## Transformer

class ScannerTransformer(ABC):
    """
    Holds the logic to wrap a scanner to apply a transformation to each
batch
    and the logic to compute the modified schema. The schema is needed by
    ds.write_dataset when receiving an iterator of batches instead of a
scanner
    """
    def __init__(self,
                 scanner: ds.Scanner,
                 input_cols: list[str],
                 output_col: str,
                 output_col_type: str,
                 transform_df_args: Optional[dict] = None,
                 ) -> None:
        self.scanner = scanner
        self.input_cols = input_cols
        self.output_col = output_col
        self.output_col_type = output_col_type
        self.transform_df_args = transform_df_args if transform_df_args
else {}

    def transform_scanner(self) -> ds.Scanner:
        """
        Yields a transformed pa.RecordBatch at each iteration
        """
        for batch in self.scanner.to_batches():
            new_batch = self.transform_batch(batch)
            yield new_batch

    def transform_schema(self) -> pa.Schema:
        new_field = pa.field(self.output_col, self.output_col_type)
        schema = self.scanner.projected_schema.append(new_field)
        return schema

    @property
    def schema(self) -> pa.Schema:
        if not hasattr(self, '_schema'):
            self._schema = self.transform_schema()
        return self._schema

    def transform_batch(self, batch: pa.RecordBatch) -> pa.RecordBatch:
        table = pa.Table.from_batches([batch])
        table = table.select(self.input_cols)
        df = table.to_pandas()

        out_series = (self.transform_df(df, **self.transform_df_args)
                      .astype(self.output_col_type))

        out_array = pa.Array.from_pandas(out_series)
        new_batch = append_column_to_batch(batch, out_array,
self.output_col)
        return new_batch

    @abstractmethod
    def transform_df(self, df: pd.DataFrame, **kwargs) -> pd.Series:
        ...


class AppendAggCol(ScannerTransformer):

    def transform_df(self, df: pd.DataFrame, agg_func: str = 'mean',
                     ) -> pd.Series:
        out_series = df.agg(agg_func, axis=1)
        return out_series


transformer = AppendAggCol(scanner, input_cols=['a', 'b'],
                           output_col='out', output_col_type='float64',
                           transform_df_args={'agg_func': 'mean'})
wrapped_scanner = transformer.transform_scanner()

print('-----------------------------')
ds.write_dataset(wrapped_scanner, out_arrow_path,
                 schema=transformer.schema,
                 format='parquet',
                 partitioning=['partition'],
                 partitioning_flavor='hive',
                 existing_data_behavior='overwrite_or_ignore',
                 basename_template=f'{uuid4()}-{{i}}.parquet',
                 file_visitor=file_visitor)

print('\n'.join(glob(f'{out_arrow_path}/**/*')))
dataset = ds.dataset(out_arrow_path, partitioning='hive')
print(dataset.to_table().to_pandas())
```

Re: [Python] add a new column to a table during dataset consolidation

Posted by Weston Pace <we...@gmail.com>.
> You are looking for a row-wise mean, isn't it! I don't think there's an API for that pyarrow.compute.

Right, I don't think this is in there today either.  The C++ compute
infrastructure itself can create functions that run on record batches
(instead of just arrays).  An example of this is drop_null (which will
drop each row if any value in that row is null).  So it should be
possible for you to create a kernel that does this but, today, you
would have to write the kernel in C++.

> I wasn't aware that pyarrow API didnt have an add_column method (sorry again!).

Until this is fixed you can work around this by wrapping the batch
with a table.  It's a metadata-only (zero-copy) operation so shouldn't
take much time:

    def add_column_to_batch(batch, arr, name):
        tab = pa.Table.from_batches([batch])
        field = pa.field(name, arr.type)
        new_tab = tab.append_column(field, arr)
        return new_tab.to_batches()[0]

> Do I need to write a loop batch by batch as above or is there a way to apply the transformation through the scanner API like in [1]?

The write_dataset method (and most places that accept a scanner I
think) can accept an iterable of batches (although you need to supply
a schema in that case).  Using python generators you can quickly
create an iterable of batches from a scanner.  So for your example it
would look something like:

    schema = table.schema
    schema = schema.append(pa.field('mean', pa.float64()))

    def transform_scanner(scanner):
        for batch in scanner.to_batches():
            df = batch.to_pandas()
            df['mean'] = df[cols].mean(axis=1)
            new_batch = pa.RecordBatch.from_pandas(df)
            yield new_batch

    ds.write_dataset(
        transform_scanner(scanner), out_arrow_path,
        schema=schema,
        format='parquet',
        partitioning=['partition'],
        partitioning_flavor='hive',
        existing_data_behavior='delete_matching',
        basename_template=f'{uuid4()}-{{i}}.parquet'
    )

It is probably a good idea to do this because, by calling
write_dataset once, the dataset writer will be able to group small
batches (which are often created as a result of partitioning) into
larger row groups before it writes them to disk.

One more thing I would suggest is to select the columns you want
before you convert to pandas.  Not every data type is zero-copy into
pandas.  So if you have a string column for example you will be paying
the cost of converting that to pandas for no reason.  That would look
like...

    def transform_scanner(scanner):
        for batch in scanner.to_batches():
            # 'select' is another method that hasn't been added to
            # pa.RecordBatch yet so we'll wrap in a table again
            table = pa.Table.from_batches([batch])
            table = table.select(cols)
            df = table.to_pandas()
            df['mean'] = df.mean(axis=1)
            # Only convert the new array back to arrow
            mean_arr = pa.Array.from_pandas(df['mean'])
            new_batch = add_column_to_batch(batch, mean_arr, 'mean')
            yield new_batch

> In the latest arrow code base might have support for 'projection'

This was very recently added (thanks Joris!) in [1].  I'm not sure if
that is part of the current 7.0.0 RC or not.  This won't work, as you
pointed out, because there is no compute function for row-wise mean.
But if we had one it would look something like:

    add_val = pc.add(ds.field('a'), ds.field('b'))
    scanner = dataset.scanner(columns = {
        'a': ds.field('a'),
        'b': ds.field('b'),
        'c': add_val
    })

# In the future

There is some ongoing work that is related to what you are doing here.

## Tensors

When you have a lot of columns with the same data type and you want to
operate on them rowwise it brings tensors (matrices) to mind.  I'm not
sure if this would be applicable to your use case or not.  For your
example this would mean treating the columns b, c as a 1x2 tensor so
you have a single column "bc" of tensors.

That being said, while there is some support for encoding and decoding
tensors into arrow, there are not yet any compute functions for
tensors.  So until that happens I don't know that you would be much
better off.

## Python UDFs

At some point I expect we will have support for using python UDFs
which would basically be a slightly altered version of what we did
above with the transform_batches method where we are still letting
pandas do the heavy lifting.  At the end of the day this wouldn't be a
lot different than what you are doing today (in terms of performance)
but we wouldn't have to rely on write_dataset accepting an iterable of
batches (because your scanner output would already be what you want).

[1] https://issues.apache.org/jira/browse/ARROW-12060

On Mon, Jan 24, 2022 at 6:19 AM Niranda Perera <ni...@gmail.com> wrote:
>
> Hi Antonio,
> Sorry I think I misunderstood your question. You are looking for a row-wise mean, isn't it! I don't think there's an API for that pyarrow.compute. Sorry my bad.
> You could call `add` for each column and manually create the mean (this would be a vectorized operation column-wise. But this would create 2 additional length-sized memory allocations at least AFAIU, because arrow doesn't have mutable methods).
> I wasn't aware that pyarrow API didnt have an add_column method (sorry again!). It's available in C++ API. But for that also, you could simply create a list with the existing columns.
> Following would be my suggestion (not tested). But I agree, this is not as pretty as the pandas solution! :-)
> ```
> def calc_mean(batch, cols):
>    res = batch[cols[0]]
>
>   if len(cols) == 1:
>   return res
>
>    for c in cols[1:]:
>      res = pa.compute.add(sum, batch[c])
>
>   return pa.compute.divide(res, len(cols))
>
> ...
>
> for batch in scanner.to_batches():
>     new_cols = batch.columns
>     new_cols.append(calc_mean(batch, cols))
>
>     new_batch = pa.record_batch(data=new_cols,
>        schema=batch.schema.append(pa.field('mean', pa.float64())))
>     ...
> ```
>
>
>
> On Mon, Jan 24, 2022 at 9:11 AM Antonino Ingargiola <tr...@gmail.com> wrote:
>>
>> Hi Niranda,
>>
>> On Mon, Jan 24, 2022 at 2:41 PM Niranda Perera <ni...@gmail.com> wrote:
>>>
>>> Did you try using `pyarrow.compute` options? Inside that batch iterator loop you can call the compute mean function and then call the add_column method for record batches.
>>
>>
>> I cannot find how to pass multiple columns to be aggregated to pyarrow.compute functions. As far as I understand pyarrow.compute functions only accept a single 1D pyarrow.array as input. Maybe you had something else in mind.
>>
>> Besides, I don't see any add_column or append_column method for pyarrow.RecordBatch[1]
>>
>> [1] https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html
>>
>> The only solution I see is calling the compute function for each row of the RecordBatch (transforming each row to a pyarrow.array somehow). But this would be quite inefficient. On the contrary, pandas can compute the aggregation across columns in a vectorized way (at the additional cost of pyarrow <-> pandas roundtrip conversion).
>>
>>> In the latest arrow code base might have support for 'projection', that could do this without having to iterate through record batches. @Weston Pace WDYT?
>>
>>
>> If this is possible it would be great!
>>
>> Best,
>> Antonio
>
>
>
> --
> Niranda Perera
> https://niranda.dev/
> @n1r44
>

Re: [Python] add a new column to a table during dataset consolidation

Posted by Niranda Perera <ni...@gmail.com>.
Hi Antonio,
Sorry I think I misunderstood your question. You are looking for a row-wise
mean, isn't it! I don't think there's an API for that pyarrow.compute.
Sorry my bad.
You could call `add` for each column and manually create the mean (this
would be a vectorized operation column-wise. But this would create 2
additional length-sized memory allocations at least AFAIU, because arrow
doesn't have mutable methods).
I wasn't aware that pyarrow API didnt have an add_column method (sorry
again!). It's available in C++ API. But for that also, you could simply
create a list with the existing columns.
Following would be my suggestion (not tested). But I agree, this is not as
pretty as the pandas solution! :-)
```
def calc_mean(batch, cols):
   res = batch[cols[0]]

  if len(cols) == 1:
  return res

   for c in cols[1:]:
     res = pa.compute.add(sum, batch[c])

  return pa.compute.divide(res, len(cols))

...

for batch in scanner.to_batches():
    new_cols = batch.columns
    new_cols.append(calc_mean(batch, cols))

    new_batch = pa.record_batch(data=new_cols,
       schema=batch.schema.append(pa.field('mean', pa.float64())))
    ...
```



On Mon, Jan 24, 2022 at 9:11 AM Antonino Ingargiola <tr...@gmail.com>
wrote:

> Hi Niranda,
>
> On Mon, Jan 24, 2022 at 2:41 PM Niranda Perera <ni...@gmail.com>
> wrote:
>
>> Did you try using `pyarrow.compute` options? Inside that batch iterator
>> loop you can call the compute mean function and then call the add_column
>> method for record batches.
>>
>
> I cannot find how to pass multiple columns to be aggregated to
> pyarrow.compute functions. As far as I understand pyarrow.compute functions
> only accept a single 1D pyarrow.array as input. Maybe you had something
> else in mind.
>
> Besides, I don't see any add_column or append_column method for
> pyarrow.RecordBatch[1]
>
> [1]
> https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html
>
> The only solution I see is calling the compute function for each row of
> the RecordBatch (transforming each row to a pyarrow.array somehow). But
> this would be quite inefficient. On the contrary, pandas can compute the
> aggregation across columns in a vectorized way (at the additional cost of
> pyarrow <-> pandas roundtrip conversion).
>
> In the latest arrow code base might have support for 'projection', that
>> could do this without having to iterate through record batches. @Weston
>> Pace <we...@gmail.com> WDYT?
>>
>
> If this is possible it would be great!
>
> Best,
> Antonio
>


-- 
Niranda Perera
https://niranda.dev/
@n1r44 <https://twitter.com/N1R44>

Re: [Python] add a new column to a table during dataset consolidation

Posted by Antonino Ingargiola <tr...@gmail.com>.
Hi Niranda,

On Mon, Jan 24, 2022 at 2:41 PM Niranda Perera <ni...@gmail.com>
wrote:

> Did you try using `pyarrow.compute` options? Inside that batch iterator
> loop you can call the compute mean function and then call the add_column
> method for record batches.
>

I cannot find how to pass multiple columns to be aggregated to
pyarrow.compute functions. As far as I understand pyarrow.compute functions
only accept a single 1D pyarrow.array as input. Maybe you had something
else in mind.

Besides, I don't see any add_column or append_column method for
pyarrow.RecordBatch[1]

[1] https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html

The only solution I see is calling the compute function for each row of the
RecordBatch (transforming each row to a pyarrow.array somehow). But this
would be quite inefficient. On the contrary, pandas can compute the
aggregation across columns in a vectorized way (at the additional cost of
pyarrow <-> pandas roundtrip conversion).

In the latest arrow code base might have support for 'projection', that
> could do this without having to iterate through record batches. @Weston
> Pace <we...@gmail.com> WDYT?
>

If this is possible it would be great!

Best,
Antonio

Re: [Python] add a new column to a table during dataset consolidation

Posted by Niranda Perera <ni...@gmail.com>.
Hi Antonio,

Did you try using `pyarrow.compute` options? Inside that batch iterator
loop you can call the compute mean function and then call the add_column
method for record batches.
In the latest arrow code base might have support for 'projection', that
could do this without having to iterate through record batches. @Weston Pace
<we...@gmail.com> WDYT?


On Mon, Jan 24, 2022 at 7:39 AM Antonino Ingargiola <tr...@gmail.com>
wrote:

> Hi list,
>
> I am looking for a way to add a new column to an existing table that is
> computed as the sum/mean of other columns.  From the docs, I understand
> that pyarrow compute functions operate on arrays (i.e. columns) but I
> cannot find if it is possible to aggregate through columns in some way.
>
> In addition, I use a scanner API to load and re-save a parquet dataset for
> consolidating the small files in each partition, following the example from
> the docs[1]. But, now I would like to extend the consolidation step adding
> new columns before saving the new dataset.
>
> Here is an example of what I want to achieve. In this case I am using
> pandas and scanner.to_batches():
>
> import pyarrow.dataset as ds
> import pyarrow as pa
> from glob import glob
> from uuid import uuid4
>
> # Create the input dataset
> data_dict = {'partition': [1, 1, 2, 2],
> 'a': [1, 2, 3, 4],
> 'b': [2, 4, 6, 8]}
> table = pa.Table.from_pydict(data_dict)
>
> in_arrow_path = 'example_input'
> out_arrow_path = 'example_output'
>
> ds.write_dataset(table, in_arrow_path, format='parquet',
> partitioning=['partition'],
> partitioning_flavor='hive',
> existing_data_behavior='delete_matching',
> basename_template=f'{uuid4()}-{{i}}')
>
> print('\n'.join(glob(f'{in_arrow_path}/**/*')))
> dataset = ds.dataset(in_arrow_path, partitioning='hive')
> print(dataset.to_table().to_pandas())
>
> # Re-save the input dataset adding a new column ("consolidation")
> scanner = dataset.scanner()
> cols = ['a', 'b']
>
> for batch in scanner.to_batches():
> df = batch.to_pandas()
> df['mean'] = df[cols].mean(axis=1)
> new_batch = pa.RecordBatch.from_pandas(df)
> ds.write_dataset(
> new_batch, out_arrow_path,
> format='parquet',
> partitioning=['partition'],
> partitioning_flavor='hive',
> existing_data_behavior='delete_matching',
> basename_template=f'{uuid4()}-{{i}}.parquet'
> )
>
> print('\n'.join(glob(f'{out_arrow_path}/**/*')))
> new_dataset = ds.dataset(out_arrow_path, partitioning='hive')
> print(new_dataset.to_table().to_pandas())
>
> So the questions are:
>
> 1. Can I add the new column (mean) directly in pyarrow?
> 2. Do I need to write a loop batch by batch as above or is there a way to
> apply the transformation through the scanner API like in [1]?
>
> Thanks for any advice.
>
> Antonio
>
> [1]
> https://arrow.apache.org/docs/python/dataset.html#writing-large-amounts-of-data
>
>

-- 
Niranda Perera
https://niranda.dev/
@n1r44 <https://twitter.com/N1R44>