You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Joris Van den Bossche (Jira)" <ji...@apache.org> on 2022/10/24 08:40:00 UTC

[jira] [Updated] (ARROW-16029) [Python] Runaway process with generator in "write_dataset()"

     [ https://issues.apache.org/jira/browse/ARROW-16029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joris Van den Bossche updated ARROW-16029:
------------------------------------------
    Priority: Critical  (was: Major)

> [Python] Runaway process with generator in "write_dataset()"
> ------------------------------------------------------------
>
>                 Key: ARROW-16029
>                 URL: https://issues.apache.org/jira/browse/ARROW-16029
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>            Reporter: Martin Thøgersen
>            Priority: Critical
>
> We have a complex containerized data pipeline that keeps running, even if the main process fails, so we have to stop the containers manually. The issue boils down to the following:
> The method {{pyarrow.dataset.write_dataset()}} accepts an [iterable of RecordBatch|https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html].
> This means that generators should also work, since [a generator is an iterator, which is an iterable.|https://stackoverflow.com/questions/2776829/difference-between-pythons-generators-and-iterators]
> The following mininal example can't be stopped with Ctrl-C/KeyboardInterupt (SIGINT signal 2). We need to run at minimum `killall -3 python` (SIGQUIT) to close the process.
> {code:python}
> from time import sleep
> import pyarrow as pa
> import pyarrow.dataset as ds
> def mygenerator():
>     i = 0
>     while True:
>         sleep(0.1)
>         i = i + 1
>         print(i)
>         yield pa.RecordBatch.from_pylist([{"mycol": "myval"}] * 10)
> schema = pa.schema([("mycol", pa.string())])
> # Does NOT respect KeyboardInterrupt:
> ds.write_dataset(data=mygenerator(),
>                  schema=schema,
>                  base_dir="mydir",
>                  format="parquet",
>                  existing_data_behavior="overwrite_or_ignore",
>                  )
> {code}
> In practice the generator is not infinite, but represents a series of API calls that can't be held in memory.
> The following examples shows that generators work well with e.g. {{{}pa.Table.from_batches(){}}}. So the issue could be limited to the Dataset API?
> {code:python}
> # Respects KeyboardInterrupt:
> for i in mygenerator():
>     pass
> {code}
> {code:python}
> # Respects KeyboardInterrupt:
> table = pa.Table.from_batches(mygenerator(), schema)
> {code}
> OS: Ubuntu 20.04.3 LTS
> python: 3.8
> pyarrow: 7.0.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)