You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Wes McKinney <we...@gmail.com> on 2020/05/01 18:30:36 UTC

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

I just wrote up a ticket about a general purpose multi-consumer
scheduler API, do you think this could be the beginning of a
resolution?

https://issues.apache.org/jira/browse/ARROW-8667

We may also want to design in some affordances so that no consumer is
ever 100% blocked, even if that causes temporary thread
oversubscription (e.g. if we have a 16-thread underlying thread pool,
and they're all being used by one consumer, then a new consumer shows
up, we spawn a 17-th thread to accommodate the new consumer
temporarily until some of the other tasks finish, at which point we
drop down to 16 threads again, but share them fairly)

I haven't read the whole e-mail discussion but I will read more of it
and make some other comments.

On Thu, Apr 30, 2020 at 3:17 PM David Li <li...@gmail.com> wrote:
>
> Francois,
>
> Thanks for the pointers. I'll see if I can put together a
> proof-of-concept, might that help discussion? I agree it would be good
> to make it format-agnostic. I'm also curious what thoughts you'd have
> on how to manage cross-file parallelism (coalescing only helps within
> a file). If we just naively start scanning fragments in parallel, we'd
> still want some way to help ensure the actual reads get issued roughly
> in order of file (to avoid the problem discussed above, where reads
> for file B prevent reads for file A from getting scheduled, where B
> follows A from the consumer's standpoint).
>
> Antoine,
>
> We would be interested in that as well. One thing we do want to
> investigate is a better ReadAsync() implementation for S3File as
> preliminary benchmarking on our side has shown it's quite inefficient
> (the default implementation makes lots of memcpy()s).
>
> Thanks,
> David
>
> On 4/30/20, Antoine Pitrou <an...@python.org> wrote:
> >
> > If we want to discuss IO APIs we should do that comprehensively.
> > There are various ways of expressing what we want to do (explicit
> > readahead, fadvise-like APIs, async APIs, etc.).
> >
> > Regards
> >
> > Antoine.
> >
> >
> > Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
> >> One more point,
> >>
> >> It would seem beneficial if we could express this in
> >> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
> >> buffering/coalescing would be needed. In the case of Parquet, we'd get
> >> the _exact_ ranges computed from the medata.This method would also
> >> possibly benefit other filesystems since on linux it can call
> >> `readahead` and/or `madvise`.
> >>
> >> François
> >>
> >>
> >> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
> >> <fs...@gmail.com> wrote:
> >>>
> >>> Hello David,
> >>>
> >>> I think that what you ask is achievable with the dataset API without
> >>> much effort. You'd have to insert the pre-buffering at
> >>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is
> >>> essentially a generator that looks like
> >>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
> >>> fragment in-order. The application consuming the ScanTask could
> >>> control the number of scheduled tasks by looking at the IO pool load.
> >>>
> >>> OTOH, It would be good if we could make this format agnostic, e.g.
> >>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
> >>> would be applicable to all formats, CSV, ipc, ...
> >>>
> >>> François
> >>> [1]
> >>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
> >>>
> >>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com> wrote:
> >>>>
> >>>> Sure, and we are still interested in collaborating. The main use case
> >>>> we have is scanning datasets in order of the partition key; it seems
> >>>> ordering is the only missing thing from Antoine's comments. However,
> >>>> from briefly playing around with the Python API, an application could
> >>>> manually order the fragments if so desired, so that still works for
> >>>> us, even if ordering isn't otherwise a guarantee.
> >>>>
> >>>> Performance-wise, we would want intra-file concurrency (coalescing)
> >>>> and inter-file concurrency (buffering files in order, as described in
> >>>> my previous messages). Even if Datasets doesn't directly handle this,
> >>>> it'd be ideal if an application could achieve this if it were willing
> >>>> to manage the details. I also vaguely remember seeing some interest in
> >>>> things like being able to distribute a computation over a dataset via
> >>>> Dask or some other distributed computation system, which would also be
> >>>> interesting to us, though not a concrete requirement.
> >>>>
> >>>> I'd like to reference the original proposal document, which has more
> >>>> detail on our workloads and use cases:
> >>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
> >>>> As described there, we have a library that implements both a
> >>>> datasets-like API (hand it a remote directory, get back an Arrow
> >>>> Table) and several optimizations to make that library perform
> >>>> acceptably. Our motivation here is to be able to have a path to
> >>>> migrate to using and contributing to Arrow Datasets, which we see as a
> >>>> cross-language, cross-filesystem library, without regressing in
> >>>> performance. (We are limited to Python and S3.)
> >>>>
> >>>> Best,
> >>>> David
> >>>>
> >>>> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
> >>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
> >>>>>> guaranteed to download all the files in order, but with more control,
> >>>>>> you can make this more likely. You can also prevent the case where
> >>>>>> due
> >>>>>> to scheduling, file N+1 doesn't even start downloading until after
> >>>>>> file N+2, which can happen if you just submit all reads to a thread
> >>>>>> pool, as demonstrated in the linked trace.
> >>>>>>
> >>>>>> And again, with this level of control, you can also decide to reduce
> >>>>>> or increase parallelism based on network conditions, memory usage,
> >>>>>> other readers, etc. So it is both about improving/smoothing out
> >>>>>> performance, and limiting resource consumption.
> >>>>>>
> >>>>>> Finally, I do not mean to propose that we necessarily build all of
> >>>>>> this into Arrow, just that it we would like to make it possible to
> >>>>>> build this with Arrow, and that Datasets may find this interesting
> >>>>>> for
> >>>>>> its optimization purposes, if concurrent reads are a goal.
> >>>>>>
> >>>>>>>  Except that datasets are essentially unordered.
> >>>>>>
> >>>>>> I did not realize this, but that means it's not really suitable for
> >>>>>> our use case, unfortunately.
> >>>>>
> >>>>> It would be helpful to understand things a bit better so that we do
> >>>>> not miss out on an opportunity to collaborate. I don't know that the
> >>>>> current mode of the some of the public Datasets APIs is a dogmatic
> >>>>> view about how everything should always work, and it's possible that
> >>>>> some relatively minor changes could allow you to use it. So let's try
> >>>>> not to be closing any doors right now
> >>>>>
> >>>>>> Thanks,
> >>>>>> David
> >>>>>>
> >>>>>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
> >>>>>>>
> >>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
> >>>>>>>> Sure -
> >>>>>>>>
> >>>>>>>> The use case is to read a large partitioned dataset, consisting of
> >>>>>>>> tens or hundreds of Parquet files. A reader expects to scan through
> >>>>>>>> the data in order of the partition key. However, to improve
> >>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N + k
> >>>>>>>> while the consumer is still reading file N, so that it doesn't have
> >>>>>>>> to
> >>>>>>>> wait every time it opens a new file, and to help hide any latency
> >>>>>>>> or
> >>>>>>>> slowness that might be happening on the backend. We also don't want
> >>>>>>>> to
> >>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't,
> >>>>>>>> because
> >>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
> >>>>>>>
> >>>>>>> But depending on network conditions, you may very well get file N+2
> >>>>>>> before N+1, even if you start loading it after...
> >>>>>>>
> >>>>>>>> This is why I mention the project is quite similar to the Datasets
> >>>>>>>> project - Datasets likely covers all the functionality we would
> >>>>>>>> eventually need.
> >>>>>>>
> >>>>>>> Except that datasets are essentially unordered.
> >>>>>>>
> >>>>>>> Regards
> >>>>>>>
> >>>>>>> Antoine.
> >>>>>>>
> >>>>>
> >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
(Apologies for the double-email)

In the original coalescing PR, an "AsyncContext" abstraction was
discussed. I could imagine being able to hold arbitrary
attributes/metrics for tasks that a scheduler could then take into
account, while making it easier for applications to thread through all
the different custom attributes they care about without having to add
parameters all over Arrow.

Best,
David

On 5/1/20, David Li <li...@gmail.com> wrote:
> Wes,
>
> From the outline there it seems like a path forward. I think the mode
> that would be helpful here is some sort of simple ordering/dependency
> so that the scheduler knows not to schedule subtasks of B until all
> subtasks of A have started (but not necessarily finished).
>
> I think the other part would be sizing the pool/number of concurrent
> tasks not just by number of tasks, but by some other metric (e.g. if a
> user attached a memory usage or bandwidth usage number to each task,
> and asked the scheduler to try to stay below some threshold).
>
> Ideally if both were somewhat application-pluggable (e.g. Datasets
> attaches an ordering to tasks and the coalescer attaches a memory
> usage metric, but it's up to the application to provide a scheduler
> that actually accounts for those), that would let applications do
> whatever wonky logic they need and limit the amount of bespoke things
> maintained in Arrow, at least until it's clear what sorts of
> strategies are commonly needed.
>
> Thanks,
> David
>
> On 5/1/20, Wes McKinney <we...@gmail.com> wrote:
>> I just wrote up a ticket about a general purpose multi-consumer
>> scheduler API, do you think this could be the beginning of a
>> resolution?
>>
>> https://issues.apache.org/jira/browse/ARROW-8667
>>
>> We may also want to design in some affordances so that no consumer is
>> ever 100% blocked, even if that causes temporary thread
>> oversubscription (e.g. if we have a 16-thread underlying thread pool,
>> and they're all being used by one consumer, then a new consumer shows
>> up, we spawn a 17-th thread to accommodate the new consumer
>> temporarily until some of the other tasks finish, at which point we
>> drop down to 16 threads again, but share them fairly)
>>
>> I haven't read the whole e-mail discussion but I will read more of it
>> and make some other comments.
>>
>> On Thu, Apr 30, 2020 at 3:17 PM David Li <li...@gmail.com> wrote:
>>>
>>> Francois,
>>>
>>> Thanks for the pointers. I'll see if I can put together a
>>> proof-of-concept, might that help discussion? I agree it would be good
>>> to make it format-agnostic. I'm also curious what thoughts you'd have
>>> on how to manage cross-file parallelism (coalescing only helps within
>>> a file). If we just naively start scanning fragments in parallel, we'd
>>> still want some way to help ensure the actual reads get issued roughly
>>> in order of file (to avoid the problem discussed above, where reads
>>> for file B prevent reads for file A from getting scheduled, where B
>>> follows A from the consumer's standpoint).
>>>
>>> Antoine,
>>>
>>> We would be interested in that as well. One thing we do want to
>>> investigate is a better ReadAsync() implementation for S3File as
>>> preliminary benchmarking on our side has shown it's quite inefficient
>>> (the default implementation makes lots of memcpy()s).
>>>
>>> Thanks,
>>> David
>>>
>>> On 4/30/20, Antoine Pitrou <an...@python.org> wrote:
>>> >
>>> > If we want to discuss IO APIs we should do that comprehensively.
>>> > There are various ways of expressing what we want to do (explicit
>>> > readahead, fadvise-like APIs, async APIs, etc.).
>>> >
>>> > Regards
>>> >
>>> > Antoine.
>>> >
>>> >
>>> > Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
>>> >> One more point,
>>> >>
>>> >> It would seem beneficial if we could express this in
>>> >> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
>>> >> buffering/coalescing would be needed. In the case of Parquet, we'd
>>> >> get
>>> >> the _exact_ ranges computed from the medata.This method would also
>>> >> possibly benefit other filesystems since on linux it can call
>>> >> `readahead` and/or `madvise`.
>>> >>
>>> >> François
>>> >>
>>> >>
>>> >> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
>>> >> <fs...@gmail.com> wrote:
>>> >>>
>>> >>> Hello David,
>>> >>>
>>> >>> I think that what you ask is achievable with the dataset API without
>>> >>> much effort. You'd have to insert the pre-buffering at
>>> >>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method
>>> >>> is
>>> >>> essentially a generator that looks like
>>> >>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
>>> >>> fragment in-order. The application consuming the ScanTask could
>>> >>> control the number of scheduled tasks by looking at the IO pool
>>> >>> load.
>>> >>>
>>> >>> OTOH, It would be good if we could make this format agnostic, e.g.
>>> >>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
>>> >>> would be applicable to all formats, CSV, ipc, ...
>>> >>>
>>> >>> François
>>> >>> [1]
>>> >>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
>>> >>>
>>> >>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com>
>>> >>> wrote:
>>> >>>>
>>> >>>> Sure, and we are still interested in collaborating. The main use
>>> >>>> case
>>> >>>> we have is scanning datasets in order of the partition key; it
>>> >>>> seems
>>> >>>> ordering is the only missing thing from Antoine's comments.
>>> >>>> However,
>>> >>>> from briefly playing around with the Python API, an application
>>> >>>> could
>>> >>>> manually order the fragments if so desired, so that still works for
>>> >>>> us, even if ordering isn't otherwise a guarantee.
>>> >>>>
>>> >>>> Performance-wise, we would want intra-file concurrency (coalescing)
>>> >>>> and inter-file concurrency (buffering files in order, as described
>>> >>>> in
>>> >>>> my previous messages). Even if Datasets doesn't directly handle
>>> >>>> this,
>>> >>>> it'd be ideal if an application could achieve this if it were
>>> >>>> willing
>>> >>>> to manage the details. I also vaguely remember seeing some interest
>>> >>>> in
>>> >>>> things like being able to distribute a computation over a dataset
>>> >>>> via
>>> >>>> Dask or some other distributed computation system, which would also
>>> >>>> be
>>> >>>> interesting to us, though not a concrete requirement.
>>> >>>>
>>> >>>> I'd like to reference the original proposal document, which has
>>> >>>> more
>>> >>>> detail on our workloads and use cases:
>>> >>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
>>> >>>> As described there, we have a library that implements both a
>>> >>>> datasets-like API (hand it a remote directory, get back an Arrow
>>> >>>> Table) and several optimizations to make that library perform
>>> >>>> acceptably. Our motivation here is to be able to have a path to
>>> >>>> migrate to using and contributing to Arrow Datasets, which we see
>>> >>>> as
>>> >>>> a
>>> >>>> cross-language, cross-filesystem library, without regressing in
>>> >>>> performance. (We are limited to Python and S3.)
>>> >>>>
>>> >>>> Best,
>>> >>>> David
>>> >>>>
>>> >>>> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
>>> >>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com>
>>> >>>>> wrote:
>>> >>>>>>
>>> >>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
>>> >>>>>> guaranteed to download all the files in order, but with more
>>> >>>>>> control,
>>> >>>>>> you can make this more likely. You can also prevent the case
>>> >>>>>> where
>>> >>>>>> due
>>> >>>>>> to scheduling, file N+1 doesn't even start downloading until
>>> >>>>>> after
>>> >>>>>> file N+2, which can happen if you just submit all reads to a
>>> >>>>>> thread
>>> >>>>>> pool, as demonstrated in the linked trace.
>>> >>>>>>
>>> >>>>>> And again, with this level of control, you can also decide to
>>> >>>>>> reduce
>>> >>>>>> or increase parallelism based on network conditions, memory
>>> >>>>>> usage,
>>> >>>>>> other readers, etc. So it is both about improving/smoothing out
>>> >>>>>> performance, and limiting resource consumption.
>>> >>>>>>
>>> >>>>>> Finally, I do not mean to propose that we necessarily build all
>>> >>>>>> of
>>> >>>>>> this into Arrow, just that it we would like to make it possible
>>> >>>>>> to
>>> >>>>>> build this with Arrow, and that Datasets may find this
>>> >>>>>> interesting
>>> >>>>>> for
>>> >>>>>> its optimization purposes, if concurrent reads are a goal.
>>> >>>>>>
>>> >>>>>>>  Except that datasets are essentially unordered.
>>> >>>>>>
>>> >>>>>> I did not realize this, but that means it's not really suitable
>>> >>>>>> for
>>> >>>>>> our use case, unfortunately.
>>> >>>>>
>>> >>>>> It would be helpful to understand things a bit better so that we
>>> >>>>> do
>>> >>>>> not miss out on an opportunity to collaborate. I don't know that
>>> >>>>> the
>>> >>>>> current mode of the some of the public Datasets APIs is a dogmatic
>>> >>>>> view about how everything should always work, and it's possible
>>> >>>>> that
>>> >>>>> some relatively minor changes could allow you to use it. So let's
>>> >>>>> try
>>> >>>>> not to be closing any doors right now
>>> >>>>>
>>> >>>>>> Thanks,
>>> >>>>>> David
>>> >>>>>>
>>> >>>>>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>>> >>>>>>>
>>> >>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
>>> >>>>>>>> Sure -
>>> >>>>>>>>
>>> >>>>>>>> The use case is to read a large partitioned dataset, consisting
>>> >>>>>>>> of
>>> >>>>>>>> tens or hundreds of Parquet files. A reader expects to scan
>>> >>>>>>>> through
>>> >>>>>>>> the data in order of the partition key. However, to improve
>>> >>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N +
>>> >>>>>>>> k
>>> >>>>>>>> while the consumer is still reading file N, so that it doesn't
>>> >>>>>>>> have
>>> >>>>>>>> to
>>> >>>>>>>> wait every time it opens a new file, and to help hide any
>>> >>>>>>>> latency
>>> >>>>>>>> or
>>> >>>>>>>> slowness that might be happening on the backend. We also don't
>>> >>>>>>>> want
>>> >>>>>>>> to
>>> >>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't,
>>> >>>>>>>> because
>>> >>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
>>> >>>>>>>
>>> >>>>>>> But depending on network conditions, you may very well get file
>>> >>>>>>> N+2
>>> >>>>>>> before N+1, even if you start loading it after...
>>> >>>>>>>
>>> >>>>>>>> This is why I mention the project is quite similar to the
>>> >>>>>>>> Datasets
>>> >>>>>>>> project - Datasets likely covers all the functionality we would
>>> >>>>>>>> eventually need.
>>> >>>>>>>
>>> >>>>>>> Except that datasets are essentially unordered.
>>> >>>>>>>
>>> >>>>>>> Regards
>>> >>>>>>>
>>> >>>>>>> Antoine.
>>> >>>>>>>
>>> >>>>>
>>> >
>>
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Wes,

From the outline there it seems like a path forward. I think the mode
that would be helpful here is some sort of simple ordering/dependency
so that the scheduler knows not to schedule subtasks of B until all
subtasks of A have started (but not necessarily finished).

I think the other part would be sizing the pool/number of concurrent
tasks not just by number of tasks, but by some other metric (e.g. if a
user attached a memory usage or bandwidth usage number to each task,
and asked the scheduler to try to stay below some threshold).

Ideally if both were somewhat application-pluggable (e.g. Datasets
attaches an ordering to tasks and the coalescer attaches a memory
usage metric, but it's up to the application to provide a scheduler
that actually accounts for those), that would let applications do
whatever wonky logic they need and limit the amount of bespoke things
maintained in Arrow, at least until it's clear what sorts of
strategies are commonly needed.

Thanks,
David

On 5/1/20, Wes McKinney <we...@gmail.com> wrote:
> I just wrote up a ticket about a general purpose multi-consumer
> scheduler API, do you think this could be the beginning of a
> resolution?
>
> https://issues.apache.org/jira/browse/ARROW-8667
>
> We may also want to design in some affordances so that no consumer is
> ever 100% blocked, even if that causes temporary thread
> oversubscription (e.g. if we have a 16-thread underlying thread pool,
> and they're all being used by one consumer, then a new consumer shows
> up, we spawn a 17-th thread to accommodate the new consumer
> temporarily until some of the other tasks finish, at which point we
> drop down to 16 threads again, but share them fairly)
>
> I haven't read the whole e-mail discussion but I will read more of it
> and make some other comments.
>
> On Thu, Apr 30, 2020 at 3:17 PM David Li <li...@gmail.com> wrote:
>>
>> Francois,
>>
>> Thanks for the pointers. I'll see if I can put together a
>> proof-of-concept, might that help discussion? I agree it would be good
>> to make it format-agnostic. I'm also curious what thoughts you'd have
>> on how to manage cross-file parallelism (coalescing only helps within
>> a file). If we just naively start scanning fragments in parallel, we'd
>> still want some way to help ensure the actual reads get issued roughly
>> in order of file (to avoid the problem discussed above, where reads
>> for file B prevent reads for file A from getting scheduled, where B
>> follows A from the consumer's standpoint).
>>
>> Antoine,
>>
>> We would be interested in that as well. One thing we do want to
>> investigate is a better ReadAsync() implementation for S3File as
>> preliminary benchmarking on our side has shown it's quite inefficient
>> (the default implementation makes lots of memcpy()s).
>>
>> Thanks,
>> David
>>
>> On 4/30/20, Antoine Pitrou <an...@python.org> wrote:
>> >
>> > If we want to discuss IO APIs we should do that comprehensively.
>> > There are various ways of expressing what we want to do (explicit
>> > readahead, fadvise-like APIs, async APIs, etc.).
>> >
>> > Regards
>> >
>> > Antoine.
>> >
>> >
>> > Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
>> >> One more point,
>> >>
>> >> It would seem beneficial if we could express this in
>> >> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
>> >> buffering/coalescing would be needed. In the case of Parquet, we'd get
>> >> the _exact_ ranges computed from the medata.This method would also
>> >> possibly benefit other filesystems since on linux it can call
>> >> `readahead` and/or `madvise`.
>> >>
>> >> François
>> >>
>> >>
>> >> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
>> >> <fs...@gmail.com> wrote:
>> >>>
>> >>> Hello David,
>> >>>
>> >>> I think that what you ask is achievable with the dataset API without
>> >>> much effort. You'd have to insert the pre-buffering at
>> >>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method
>> >>> is
>> >>> essentially a generator that looks like
>> >>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
>> >>> fragment in-order. The application consuming the ScanTask could
>> >>> control the number of scheduled tasks by looking at the IO pool load.
>> >>>
>> >>> OTOH, It would be good if we could make this format agnostic, e.g.
>> >>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
>> >>> would be applicable to all formats, CSV, ipc, ...
>> >>>
>> >>> François
>> >>> [1]
>> >>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
>> >>>
>> >>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com>
>> >>> wrote:
>> >>>>
>> >>>> Sure, and we are still interested in collaborating. The main use
>> >>>> case
>> >>>> we have is scanning datasets in order of the partition key; it seems
>> >>>> ordering is the only missing thing from Antoine's comments. However,
>> >>>> from briefly playing around with the Python API, an application
>> >>>> could
>> >>>> manually order the fragments if so desired, so that still works for
>> >>>> us, even if ordering isn't otherwise a guarantee.
>> >>>>
>> >>>> Performance-wise, we would want intra-file concurrency (coalescing)
>> >>>> and inter-file concurrency (buffering files in order, as described
>> >>>> in
>> >>>> my previous messages). Even if Datasets doesn't directly handle
>> >>>> this,
>> >>>> it'd be ideal if an application could achieve this if it were
>> >>>> willing
>> >>>> to manage the details. I also vaguely remember seeing some interest
>> >>>> in
>> >>>> things like being able to distribute a computation over a dataset
>> >>>> via
>> >>>> Dask or some other distributed computation system, which would also
>> >>>> be
>> >>>> interesting to us, though not a concrete requirement.
>> >>>>
>> >>>> I'd like to reference the original proposal document, which has more
>> >>>> detail on our workloads and use cases:
>> >>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
>> >>>> As described there, we have a library that implements both a
>> >>>> datasets-like API (hand it a remote directory, get back an Arrow
>> >>>> Table) and several optimizations to make that library perform
>> >>>> acceptably. Our motivation here is to be able to have a path to
>> >>>> migrate to using and contributing to Arrow Datasets, which we see as
>> >>>> a
>> >>>> cross-language, cross-filesystem library, without regressing in
>> >>>> performance. (We are limited to Python and S3.)
>> >>>>
>> >>>> Best,
>> >>>> David
>> >>>>
>> >>>> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
>> >>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
>> >>>>>> guaranteed to download all the files in order, but with more
>> >>>>>> control,
>> >>>>>> you can make this more likely. You can also prevent the case where
>> >>>>>> due
>> >>>>>> to scheduling, file N+1 doesn't even start downloading until after
>> >>>>>> file N+2, which can happen if you just submit all reads to a
>> >>>>>> thread
>> >>>>>> pool, as demonstrated in the linked trace.
>> >>>>>>
>> >>>>>> And again, with this level of control, you can also decide to
>> >>>>>> reduce
>> >>>>>> or increase parallelism based on network conditions, memory usage,
>> >>>>>> other readers, etc. So it is both about improving/smoothing out
>> >>>>>> performance, and limiting resource consumption.
>> >>>>>>
>> >>>>>> Finally, I do not mean to propose that we necessarily build all of
>> >>>>>> this into Arrow, just that it we would like to make it possible to
>> >>>>>> build this with Arrow, and that Datasets may find this interesting
>> >>>>>> for
>> >>>>>> its optimization purposes, if concurrent reads are a goal.
>> >>>>>>
>> >>>>>>>  Except that datasets are essentially unordered.
>> >>>>>>
>> >>>>>> I did not realize this, but that means it's not really suitable
>> >>>>>> for
>> >>>>>> our use case, unfortunately.
>> >>>>>
>> >>>>> It would be helpful to understand things a bit better so that we do
>> >>>>> not miss out on an opportunity to collaborate. I don't know that
>> >>>>> the
>> >>>>> current mode of the some of the public Datasets APIs is a dogmatic
>> >>>>> view about how everything should always work, and it's possible
>> >>>>> that
>> >>>>> some relatively minor changes could allow you to use it. So let's
>> >>>>> try
>> >>>>> not to be closing any doors right now
>> >>>>>
>> >>>>>> Thanks,
>> >>>>>> David
>> >>>>>>
>> >>>>>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>> >>>>>>>
>> >>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
>> >>>>>>>> Sure -
>> >>>>>>>>
>> >>>>>>>> The use case is to read a large partitioned dataset, consisting
>> >>>>>>>> of
>> >>>>>>>> tens or hundreds of Parquet files. A reader expects to scan
>> >>>>>>>> through
>> >>>>>>>> the data in order of the partition key. However, to improve
>> >>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N +
>> >>>>>>>> k
>> >>>>>>>> while the consumer is still reading file N, so that it doesn't
>> >>>>>>>> have
>> >>>>>>>> to
>> >>>>>>>> wait every time it opens a new file, and to help hide any
>> >>>>>>>> latency
>> >>>>>>>> or
>> >>>>>>>> slowness that might be happening on the backend. We also don't
>> >>>>>>>> want
>> >>>>>>>> to
>> >>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't,
>> >>>>>>>> because
>> >>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
>> >>>>>>>
>> >>>>>>> But depending on network conditions, you may very well get file
>> >>>>>>> N+2
>> >>>>>>> before N+1, even if you start loading it after...
>> >>>>>>>
>> >>>>>>>> This is why I mention the project is quite similar to the
>> >>>>>>>> Datasets
>> >>>>>>>> project - Datasets likely covers all the functionality we would
>> >>>>>>>> eventually need.
>> >>>>>>>
>> >>>>>>> Except that datasets are essentially unordered.
>> >>>>>>>
>> >>>>>>> Regards
>> >>>>>>>
>> >>>>>>> Antoine.
>> >>>>>>>
>> >>>>>
>> >
>