You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by David Li <li...@gmail.com> on 2020/04/29 18:49:44 UTC

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Hi all,

I’d like to follow up on this discussion. Thanks to Antoine, we now
have a read coalescing implementation in-tree which shows clear
performance benefits both when reading plain files and Parquet
files[1]. We now have some follow-up work where we think the design
and implementation could be interesting to the Datasets project. Some
context follows.

We’re using coalescing while loading multiple files in parallel, and
have found that we don’t get the expected speedup. We took the
straightforward approach: buffer up to N additional files in the
background using multiple threads, and read from each file in
sequence, adding more files to the tail as the files at the head are
consumed. This way, we begin loading data for future files while
deserializing/processing the current file.

However, we noticed this doesn’t actually bring us the expected
benefits. Consider files A, B, and C being buffered in parallel; right
now, all I/O goes through an internal I/O pool, and so several
operations for each of the three files get added to the pool. However,
they get serviced in some random order, and so it’s possible for file
C to finish all its I/O operations before file B can. Then, a consumer
is unnecessarily stuck waiting for those to complete.

As a demonstration, see the third chart in this visualization:
https://www.lidavidm.me/arrow/coalescing/vis.html For context: this is
a plot of trace spans, where x-axis is time, y-axis is thread ID, and
color-coding indicates which file the span is associated with. As you
can see, there is a large gap because file_1.parquet gets scheduled
after file_2 and file_3.parquet, and so the consumer thread (row 3
from the bottom) is stuck waiting for that the whole time.

Our proposed approach is to give the application enough control that
it can coordinate I/O across separate files. Returning to the example
above, if the application could discover and execute the I/O
operations for a Parquet file (or FeatherV2, etc.) separately, the
application could hold off on executing I/O for file C until all I/O
for file B has at least started. For example, in the current
framework, if we could tell how many I/O calls would happen when
reading from a Parquet file, we could insert a countdown latch in
between the Parquet file and the RandomAccessFile, which enqueues
reads and prevents them from progressing until all reads for the
previous file have made it through.

This has additional benefits: the application can lower concurrency to
limit memory usage. Or conversely, if it notices many small reads are
being issued, it can raise concurrency to keep bandwidth utilization
high while being confident that it isn’t using additional memory. By
controlling the ordering, the data consumer also gets more consistent
(less variation) read performance.

Concretely, for the PR adding coalescing to Parquet[1], I’d want to
rework it so that the Parquet reader can be asked for byte ranges
given a set of row groups and column indices. Then, I’d like to rework
the reader APIs to optionally accept a pre-populated ReadCache, so
that the application can populate the ReadCache as it wants. Any
further control over I/O would be done by the application, by wrapping
RandomAccessFile as appropriate.

We realize Datasets could do a lot of this, especially as it already
works with multiple files, Amazon S3, and has a JNI binding in
progress (Java being the main impetus of our current project).
However, we’d feel more comfortable building on Datasets once the API
is more stable. In the meantime, though, we would be interested in
pursuing this as an improvement to Datasets. I’m still not familiar
enough with the project and its roadmap to know how this would fit in,
however - but does this sound like it would want to be addressed by
the project eventually?

Best,
David

[1]: https://github.com/apache/arrow/pull/6744#issuecomment-607431959
Note some caveats on the numbers there - the rates are only correct
relative to each other since the benchmark doesn’t measure the actual
size of the deserialized data.


On 3/23/20, David Li <li...@gmail.com> wrote:
> Thanks. I've set up an AWS account for my own testing for now. I've
> also submitted a PR to add a basic benchmark which can be run
> self-contained, against a local Minio instance, or against S3:
> https://github.com/apache/arrow/pull/6675
>
> I ran the benchmark from my local machine, and I can test from EC2
> sometime as well. Performance is not ideal, but I'm being limited by
> my home internet connection - coalescing small chunked reads is (as
> expected) as fast as reading the file in one go, and in the PR
> (testing against localhost where we're not limited by bandwidth), it's
> faster than either option.
>
> ----------------------------------------------------------------------------------
> Benchmark                                           Time           CPU
> Iterations
> ----------------------------------------------------------------------------------
> MinioFixture/ReadAll1Mib/real_time          223416933 ns    9098743 ns
>        413   4.47594MB/s    4.47594 items/s
> MinioFixture/ReadAll100Mib/real_time       6068938152 ns  553319299 ns
>         10   16.4773MB/s   0.164773 items/s
> MinioFixture/ReadAll500Mib/real_time       30735046155 ns 2620718364
> ns          2   16.2681MB/s  0.0325361 items/s
> MinioFixture/ReadChunked100Mib/real_time   9625661666 ns  448637141 ns
>         12   10.3889MB/s   0.103889 items/s
> MinioFixture/ReadChunked500Mib/real_time   58736796101 ns 2070237834
> ns          2   8.51255MB/s  0.0170251 items/s
> MinioFixture/ReadCoalesced100Mib/real_time 6982902546 ns   22553824 ns
>         10   14.3207MB/s   0.143207 items/s
> MinioFixture/ReadCoalesced500Mib/real_time 29923239648 ns  112736805
> ns          3   16.7094MB/s  0.0334188 items/s
> MinioFixture/ReadParquet250K/real_time     21934689795 ns 2052758161
> ns          3   9.90955MB/s  0.0455899 items/s
>
> Best,
> David
>
>
> On 3/22/20, Wes McKinney <we...@gmail.com> wrote:
>> On Thu, Mar 19, 2020 at 10:04 AM David Li <li...@gmail.com> wrote:
>>>
>>> > That's why it's important that we set ourselves up to do performance
>>> > testing in a realistic environment in AWS rather than simulating it.
>>>
>>> For my clarification, what are the plans for this (if any)? I couldn't
>>> find any prior discussion, though it sounds like the discussion around
>>> cloud CI capacity would be one step towards this.
>>>
>>> In the short term we could make tests/benchmarks configurable to not
>>> point at a Minio instance so individual developers can at least try
>>> things.
>>
>> It probably makes sense to begin investing in somewhat portable
>> tooling to assist with running S3-related unit tests and benchmarks
>> inside AWS. This could include initial Parquet dataset generation and
>> other things.
>>
>> As far as testing, I'm happy to pay for some AWS costs (within
>> reason). AWS might be able to donate some credits to us also
>>
>>> Best,
>>> David
>>>
>>> On 3/18/20, David Li <li...@gmail.com> wrote:
>>> > For us it applies to S3-like systems, not only S3 itself, at least.
>>> >
>>> > It does make sense to limit it to some filesystems. The behavior would
>>> > be opt-in at the Parquet reader level, so at the Datasets or
>>> > Filesystem layer we can take care of enabling the flag for filesystems
>>> > where it actually helps.
>>> >
>>> > I've filed these issues:
>>> > - ARROW-8151 to benchmark S3File+Parquet
>>> > (https://issues.apache.org/jira/browse/ARROW-8151)
>>> > - ARROW-8152 to split large reads
>>> > (https://issues.apache.org/jira/browse/ARROW-8152)
>>> > - PARQUET-1820 to use a column filter hint with coalescing
>>> > (https://issues.apache.org/jira/browse/PARQUET-1820)
>>> >
>>> > in addition to PARQUET-1698 which is just about pre-buffering the
>>> > entire row group (which we can now do with ARROW-7995).
>>> >
>>> > Best,
>>> > David
>>> >
>>> > On 3/18/20, Antoine Pitrou <an...@python.org> wrote:
>>> >>
>>> >> Le 18/03/2020 à 18:30, David Li a écrit :
>>> >>>> Instead of S3, you can use the Slow streams and Slow filesystem
>>> >>>> implementations.  It may better protect against varying external
>>> >>>> conditions.
>>> >>>
>>> >>> I think we'd want several different benchmarks - we want to ensure
>>> >>> we
>>> >>> don't regress local filesystem performance, and we also want to
>>> >>> measure in an actual S3 environment. It would also be good to
>>> >>> measure
>>> >>> S3-compatible systems like Google's.
>>> >>>
>>> >>>>> - Use the coalescing inside the Parquet reader (even without a
>>> >>>>> column
>>> >>>>> filter hint - this would subsume PARQUET-1698)
>>> >>>>
>>> >>>> I'm assuming this would be done at the RowGroupReader level, right?
>>> >>>
>>> >>> Ideally we'd be able to coalesce across row groups as well, though
>>> >>> maybe it'd be easier to start with within-row-group-only (I need to
>>> >>> familiarize myself with the reader more).
>>> >>>
>>> >>>> I don't understand what the "advantage" would be.  Can you
>>> >>>> elaborate?
>>> >>>
>>> >>> As Wes said, empirically you can get more bandwidth out of S3 with
>>> >>> multiple concurrent HTTP requests. There is a cost to doing so
>>> >>> (establishing a new connection takes time), hence why the coalescing
>>> >>> tries to group small reads (to fully utilize one connection) and
>>> >>> split
>>> >>> large reads (to be able to take advantage of multiple connections).
>>> >>
>>> >> If that's S3-specific (or even AWS-specific) it might better be done
>>> >> inside the S3 filesystem.  For other filesystems I don't think it
>>> >> makes
>>> >> sense to split reads.
>>> >>
>>> >> Regards
>>> >>
>>> >> Antoine.
>>> >>
>>> >
>>
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
(Apologies for the double-email)

In the original coalescing PR, an "AsyncContext" abstraction was
discussed. I could imagine being able to hold arbitrary
attributes/metrics for tasks that a scheduler could then take into
account, while making it easier for applications to thread through all
the different custom attributes they care about without having to add
parameters all over Arrow.

Best,
David

On 5/1/20, David Li <li...@gmail.com> wrote:
> Wes,
>
> From the outline there it seems like a path forward. I think the mode
> that would be helpful here is some sort of simple ordering/dependency
> so that the scheduler knows not to schedule subtasks of B until all
> subtasks of A have started (but not necessarily finished).
>
> I think the other part would be sizing the pool/number of concurrent
> tasks not just by number of tasks, but by some other metric (e.g. if a
> user attached a memory usage or bandwidth usage number to each task,
> and asked the scheduler to try to stay below some threshold).
>
> Ideally if both were somewhat application-pluggable (e.g. Datasets
> attaches an ordering to tasks and the coalescer attaches a memory
> usage metric, but it's up to the application to provide a scheduler
> that actually accounts for those), that would let applications do
> whatever wonky logic they need and limit the amount of bespoke things
> maintained in Arrow, at least until it's clear what sorts of
> strategies are commonly needed.
>
> Thanks,
> David
>
> On 5/1/20, Wes McKinney <we...@gmail.com> wrote:
>> I just wrote up a ticket about a general purpose multi-consumer
>> scheduler API, do you think this could be the beginning of a
>> resolution?
>>
>> https://issues.apache.org/jira/browse/ARROW-8667
>>
>> We may also want to design in some affordances so that no consumer is
>> ever 100% blocked, even if that causes temporary thread
>> oversubscription (e.g. if we have a 16-thread underlying thread pool,
>> and they're all being used by one consumer, then a new consumer shows
>> up, we spawn a 17-th thread to accommodate the new consumer
>> temporarily until some of the other tasks finish, at which point we
>> drop down to 16 threads again, but share them fairly)
>>
>> I haven't read the whole e-mail discussion but I will read more of it
>> and make some other comments.
>>
>> On Thu, Apr 30, 2020 at 3:17 PM David Li <li...@gmail.com> wrote:
>>>
>>> Francois,
>>>
>>> Thanks for the pointers. I'll see if I can put together a
>>> proof-of-concept, might that help discussion? I agree it would be good
>>> to make it format-agnostic. I'm also curious what thoughts you'd have
>>> on how to manage cross-file parallelism (coalescing only helps within
>>> a file). If we just naively start scanning fragments in parallel, we'd
>>> still want some way to help ensure the actual reads get issued roughly
>>> in order of file (to avoid the problem discussed above, where reads
>>> for file B prevent reads for file A from getting scheduled, where B
>>> follows A from the consumer's standpoint).
>>>
>>> Antoine,
>>>
>>> We would be interested in that as well. One thing we do want to
>>> investigate is a better ReadAsync() implementation for S3File as
>>> preliminary benchmarking on our side has shown it's quite inefficient
>>> (the default implementation makes lots of memcpy()s).
>>>
>>> Thanks,
>>> David
>>>
>>> On 4/30/20, Antoine Pitrou <an...@python.org> wrote:
>>> >
>>> > If we want to discuss IO APIs we should do that comprehensively.
>>> > There are various ways of expressing what we want to do (explicit
>>> > readahead, fadvise-like APIs, async APIs, etc.).
>>> >
>>> > Regards
>>> >
>>> > Antoine.
>>> >
>>> >
>>> > Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
>>> >> One more point,
>>> >>
>>> >> It would seem beneficial if we could express this in
>>> >> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
>>> >> buffering/coalescing would be needed. In the case of Parquet, we'd
>>> >> get
>>> >> the _exact_ ranges computed from the medata.This method would also
>>> >> possibly benefit other filesystems since on linux it can call
>>> >> `readahead` and/or `madvise`.
>>> >>
>>> >> François
>>> >>
>>> >>
>>> >> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
>>> >> <fs...@gmail.com> wrote:
>>> >>>
>>> >>> Hello David,
>>> >>>
>>> >>> I think that what you ask is achievable with the dataset API without
>>> >>> much effort. You'd have to insert the pre-buffering at
>>> >>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method
>>> >>> is
>>> >>> essentially a generator that looks like
>>> >>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
>>> >>> fragment in-order. The application consuming the ScanTask could
>>> >>> control the number of scheduled tasks by looking at the IO pool
>>> >>> load.
>>> >>>
>>> >>> OTOH, It would be good if we could make this format agnostic, e.g.
>>> >>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
>>> >>> would be applicable to all formats, CSV, ipc, ...
>>> >>>
>>> >>> François
>>> >>> [1]
>>> >>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
>>> >>>
>>> >>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com>
>>> >>> wrote:
>>> >>>>
>>> >>>> Sure, and we are still interested in collaborating. The main use
>>> >>>> case
>>> >>>> we have is scanning datasets in order of the partition key; it
>>> >>>> seems
>>> >>>> ordering is the only missing thing from Antoine's comments.
>>> >>>> However,
>>> >>>> from briefly playing around with the Python API, an application
>>> >>>> could
>>> >>>> manually order the fragments if so desired, so that still works for
>>> >>>> us, even if ordering isn't otherwise a guarantee.
>>> >>>>
>>> >>>> Performance-wise, we would want intra-file concurrency (coalescing)
>>> >>>> and inter-file concurrency (buffering files in order, as described
>>> >>>> in
>>> >>>> my previous messages). Even if Datasets doesn't directly handle
>>> >>>> this,
>>> >>>> it'd be ideal if an application could achieve this if it were
>>> >>>> willing
>>> >>>> to manage the details. I also vaguely remember seeing some interest
>>> >>>> in
>>> >>>> things like being able to distribute a computation over a dataset
>>> >>>> via
>>> >>>> Dask or some other distributed computation system, which would also
>>> >>>> be
>>> >>>> interesting to us, though not a concrete requirement.
>>> >>>>
>>> >>>> I'd like to reference the original proposal document, which has
>>> >>>> more
>>> >>>> detail on our workloads and use cases:
>>> >>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
>>> >>>> As described there, we have a library that implements both a
>>> >>>> datasets-like API (hand it a remote directory, get back an Arrow
>>> >>>> Table) and several optimizations to make that library perform
>>> >>>> acceptably. Our motivation here is to be able to have a path to
>>> >>>> migrate to using and contributing to Arrow Datasets, which we see
>>> >>>> as
>>> >>>> a
>>> >>>> cross-language, cross-filesystem library, without regressing in
>>> >>>> performance. (We are limited to Python and S3.)
>>> >>>>
>>> >>>> Best,
>>> >>>> David
>>> >>>>
>>> >>>> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
>>> >>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com>
>>> >>>>> wrote:
>>> >>>>>>
>>> >>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
>>> >>>>>> guaranteed to download all the files in order, but with more
>>> >>>>>> control,
>>> >>>>>> you can make this more likely. You can also prevent the case
>>> >>>>>> where
>>> >>>>>> due
>>> >>>>>> to scheduling, file N+1 doesn't even start downloading until
>>> >>>>>> after
>>> >>>>>> file N+2, which can happen if you just submit all reads to a
>>> >>>>>> thread
>>> >>>>>> pool, as demonstrated in the linked trace.
>>> >>>>>>
>>> >>>>>> And again, with this level of control, you can also decide to
>>> >>>>>> reduce
>>> >>>>>> or increase parallelism based on network conditions, memory
>>> >>>>>> usage,
>>> >>>>>> other readers, etc. So it is both about improving/smoothing out
>>> >>>>>> performance, and limiting resource consumption.
>>> >>>>>>
>>> >>>>>> Finally, I do not mean to propose that we necessarily build all
>>> >>>>>> of
>>> >>>>>> this into Arrow, just that it we would like to make it possible
>>> >>>>>> to
>>> >>>>>> build this with Arrow, and that Datasets may find this
>>> >>>>>> interesting
>>> >>>>>> for
>>> >>>>>> its optimization purposes, if concurrent reads are a goal.
>>> >>>>>>
>>> >>>>>>>  Except that datasets are essentially unordered.
>>> >>>>>>
>>> >>>>>> I did not realize this, but that means it's not really suitable
>>> >>>>>> for
>>> >>>>>> our use case, unfortunately.
>>> >>>>>
>>> >>>>> It would be helpful to understand things a bit better so that we
>>> >>>>> do
>>> >>>>> not miss out on an opportunity to collaborate. I don't know that
>>> >>>>> the
>>> >>>>> current mode of the some of the public Datasets APIs is a dogmatic
>>> >>>>> view about how everything should always work, and it's possible
>>> >>>>> that
>>> >>>>> some relatively minor changes could allow you to use it. So let's
>>> >>>>> try
>>> >>>>> not to be closing any doors right now
>>> >>>>>
>>> >>>>>> Thanks,
>>> >>>>>> David
>>> >>>>>>
>>> >>>>>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>>> >>>>>>>
>>> >>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
>>> >>>>>>>> Sure -
>>> >>>>>>>>
>>> >>>>>>>> The use case is to read a large partitioned dataset, consisting
>>> >>>>>>>> of
>>> >>>>>>>> tens or hundreds of Parquet files. A reader expects to scan
>>> >>>>>>>> through
>>> >>>>>>>> the data in order of the partition key. However, to improve
>>> >>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N +
>>> >>>>>>>> k
>>> >>>>>>>> while the consumer is still reading file N, so that it doesn't
>>> >>>>>>>> have
>>> >>>>>>>> to
>>> >>>>>>>> wait every time it opens a new file, and to help hide any
>>> >>>>>>>> latency
>>> >>>>>>>> or
>>> >>>>>>>> slowness that might be happening on the backend. We also don't
>>> >>>>>>>> want
>>> >>>>>>>> to
>>> >>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't,
>>> >>>>>>>> because
>>> >>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
>>> >>>>>>>
>>> >>>>>>> But depending on network conditions, you may very well get file
>>> >>>>>>> N+2
>>> >>>>>>> before N+1, even if you start loading it after...
>>> >>>>>>>
>>> >>>>>>>> This is why I mention the project is quite similar to the
>>> >>>>>>>> Datasets
>>> >>>>>>>> project - Datasets likely covers all the functionality we would
>>> >>>>>>>> eventually need.
>>> >>>>>>>
>>> >>>>>>> Except that datasets are essentially unordered.
>>> >>>>>>>
>>> >>>>>>> Regards
>>> >>>>>>>
>>> >>>>>>> Antoine.
>>> >>>>>>>
>>> >>>>>
>>> >
>>
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Wes,

From the outline there it seems like a path forward. I think the mode
that would be helpful here is some sort of simple ordering/dependency
so that the scheduler knows not to schedule subtasks of B until all
subtasks of A have started (but not necessarily finished).

I think the other part would be sizing the pool/number of concurrent
tasks not just by number of tasks, but by some other metric (e.g. if a
user attached a memory usage or bandwidth usage number to each task,
and asked the scheduler to try to stay below some threshold).

Ideally if both were somewhat application-pluggable (e.g. Datasets
attaches an ordering to tasks and the coalescer attaches a memory
usage metric, but it's up to the application to provide a scheduler
that actually accounts for those), that would let applications do
whatever wonky logic they need and limit the amount of bespoke things
maintained in Arrow, at least until it's clear what sorts of
strategies are commonly needed.

Thanks,
David

On 5/1/20, Wes McKinney <we...@gmail.com> wrote:
> I just wrote up a ticket about a general purpose multi-consumer
> scheduler API, do you think this could be the beginning of a
> resolution?
>
> https://issues.apache.org/jira/browse/ARROW-8667
>
> We may also want to design in some affordances so that no consumer is
> ever 100% blocked, even if that causes temporary thread
> oversubscription (e.g. if we have a 16-thread underlying thread pool,
> and they're all being used by one consumer, then a new consumer shows
> up, we spawn a 17-th thread to accommodate the new consumer
> temporarily until some of the other tasks finish, at which point we
> drop down to 16 threads again, but share them fairly)
>
> I haven't read the whole e-mail discussion but I will read more of it
> and make some other comments.
>
> On Thu, Apr 30, 2020 at 3:17 PM David Li <li...@gmail.com> wrote:
>>
>> Francois,
>>
>> Thanks for the pointers. I'll see if I can put together a
>> proof-of-concept, might that help discussion? I agree it would be good
>> to make it format-agnostic. I'm also curious what thoughts you'd have
>> on how to manage cross-file parallelism (coalescing only helps within
>> a file). If we just naively start scanning fragments in parallel, we'd
>> still want some way to help ensure the actual reads get issued roughly
>> in order of file (to avoid the problem discussed above, where reads
>> for file B prevent reads for file A from getting scheduled, where B
>> follows A from the consumer's standpoint).
>>
>> Antoine,
>>
>> We would be interested in that as well. One thing we do want to
>> investigate is a better ReadAsync() implementation for S3File as
>> preliminary benchmarking on our side has shown it's quite inefficient
>> (the default implementation makes lots of memcpy()s).
>>
>> Thanks,
>> David
>>
>> On 4/30/20, Antoine Pitrou <an...@python.org> wrote:
>> >
>> > If we want to discuss IO APIs we should do that comprehensively.
>> > There are various ways of expressing what we want to do (explicit
>> > readahead, fadvise-like APIs, async APIs, etc.).
>> >
>> > Regards
>> >
>> > Antoine.
>> >
>> >
>> > Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
>> >> One more point,
>> >>
>> >> It would seem beneficial if we could express this in
>> >> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
>> >> buffering/coalescing would be needed. In the case of Parquet, we'd get
>> >> the _exact_ ranges computed from the medata.This method would also
>> >> possibly benefit other filesystems since on linux it can call
>> >> `readahead` and/or `madvise`.
>> >>
>> >> François
>> >>
>> >>
>> >> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
>> >> <fs...@gmail.com> wrote:
>> >>>
>> >>> Hello David,
>> >>>
>> >>> I think that what you ask is achievable with the dataset API without
>> >>> much effort. You'd have to insert the pre-buffering at
>> >>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method
>> >>> is
>> >>> essentially a generator that looks like
>> >>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
>> >>> fragment in-order. The application consuming the ScanTask could
>> >>> control the number of scheduled tasks by looking at the IO pool load.
>> >>>
>> >>> OTOH, It would be good if we could make this format agnostic, e.g.
>> >>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
>> >>> would be applicable to all formats, CSV, ipc, ...
>> >>>
>> >>> François
>> >>> [1]
>> >>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
>> >>>
>> >>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com>
>> >>> wrote:
>> >>>>
>> >>>> Sure, and we are still interested in collaborating. The main use
>> >>>> case
>> >>>> we have is scanning datasets in order of the partition key; it seems
>> >>>> ordering is the only missing thing from Antoine's comments. However,
>> >>>> from briefly playing around with the Python API, an application
>> >>>> could
>> >>>> manually order the fragments if so desired, so that still works for
>> >>>> us, even if ordering isn't otherwise a guarantee.
>> >>>>
>> >>>> Performance-wise, we would want intra-file concurrency (coalescing)
>> >>>> and inter-file concurrency (buffering files in order, as described
>> >>>> in
>> >>>> my previous messages). Even if Datasets doesn't directly handle
>> >>>> this,
>> >>>> it'd be ideal if an application could achieve this if it were
>> >>>> willing
>> >>>> to manage the details. I also vaguely remember seeing some interest
>> >>>> in
>> >>>> things like being able to distribute a computation over a dataset
>> >>>> via
>> >>>> Dask or some other distributed computation system, which would also
>> >>>> be
>> >>>> interesting to us, though not a concrete requirement.
>> >>>>
>> >>>> I'd like to reference the original proposal document, which has more
>> >>>> detail on our workloads and use cases:
>> >>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
>> >>>> As described there, we have a library that implements both a
>> >>>> datasets-like API (hand it a remote directory, get back an Arrow
>> >>>> Table) and several optimizations to make that library perform
>> >>>> acceptably. Our motivation here is to be able to have a path to
>> >>>> migrate to using and contributing to Arrow Datasets, which we see as
>> >>>> a
>> >>>> cross-language, cross-filesystem library, without regressing in
>> >>>> performance. (We are limited to Python and S3.)
>> >>>>
>> >>>> Best,
>> >>>> David
>> >>>>
>> >>>> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
>> >>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
>> >>>>>> guaranteed to download all the files in order, but with more
>> >>>>>> control,
>> >>>>>> you can make this more likely. You can also prevent the case where
>> >>>>>> due
>> >>>>>> to scheduling, file N+1 doesn't even start downloading until after
>> >>>>>> file N+2, which can happen if you just submit all reads to a
>> >>>>>> thread
>> >>>>>> pool, as demonstrated in the linked trace.
>> >>>>>>
>> >>>>>> And again, with this level of control, you can also decide to
>> >>>>>> reduce
>> >>>>>> or increase parallelism based on network conditions, memory usage,
>> >>>>>> other readers, etc. So it is both about improving/smoothing out
>> >>>>>> performance, and limiting resource consumption.
>> >>>>>>
>> >>>>>> Finally, I do not mean to propose that we necessarily build all of
>> >>>>>> this into Arrow, just that it we would like to make it possible to
>> >>>>>> build this with Arrow, and that Datasets may find this interesting
>> >>>>>> for
>> >>>>>> its optimization purposes, if concurrent reads are a goal.
>> >>>>>>
>> >>>>>>>  Except that datasets are essentially unordered.
>> >>>>>>
>> >>>>>> I did not realize this, but that means it's not really suitable
>> >>>>>> for
>> >>>>>> our use case, unfortunately.
>> >>>>>
>> >>>>> It would be helpful to understand things a bit better so that we do
>> >>>>> not miss out on an opportunity to collaborate. I don't know that
>> >>>>> the
>> >>>>> current mode of the some of the public Datasets APIs is a dogmatic
>> >>>>> view about how everything should always work, and it's possible
>> >>>>> that
>> >>>>> some relatively minor changes could allow you to use it. So let's
>> >>>>> try
>> >>>>> not to be closing any doors right now
>> >>>>>
>> >>>>>> Thanks,
>> >>>>>> David
>> >>>>>>
>> >>>>>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>> >>>>>>>
>> >>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
>> >>>>>>>> Sure -
>> >>>>>>>>
>> >>>>>>>> The use case is to read a large partitioned dataset, consisting
>> >>>>>>>> of
>> >>>>>>>> tens or hundreds of Parquet files. A reader expects to scan
>> >>>>>>>> through
>> >>>>>>>> the data in order of the partition key. However, to improve
>> >>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N +
>> >>>>>>>> k
>> >>>>>>>> while the consumer is still reading file N, so that it doesn't
>> >>>>>>>> have
>> >>>>>>>> to
>> >>>>>>>> wait every time it opens a new file, and to help hide any
>> >>>>>>>> latency
>> >>>>>>>> or
>> >>>>>>>> slowness that might be happening on the backend. We also don't
>> >>>>>>>> want
>> >>>>>>>> to
>> >>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't,
>> >>>>>>>> because
>> >>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
>> >>>>>>>
>> >>>>>>> But depending on network conditions, you may very well get file
>> >>>>>>> N+2
>> >>>>>>> before N+1, even if you start loading it after...
>> >>>>>>>
>> >>>>>>>> This is why I mention the project is quite similar to the
>> >>>>>>>> Datasets
>> >>>>>>>> project - Datasets likely covers all the functionality we would
>> >>>>>>>> eventually need.
>> >>>>>>>
>> >>>>>>> Except that datasets are essentially unordered.
>> >>>>>>>
>> >>>>>>> Regards
>> >>>>>>>
>> >>>>>>> Antoine.
>> >>>>>>>
>> >>>>>
>> >
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
I just wrote up a ticket about a general purpose multi-consumer
scheduler API, do you think this could be the beginning of a
resolution?

https://issues.apache.org/jira/browse/ARROW-8667

We may also want to design in some affordances so that no consumer is
ever 100% blocked, even if that causes temporary thread
oversubscription (e.g. if we have a 16-thread underlying thread pool,
and they're all being used by one consumer, then a new consumer shows
up, we spawn a 17-th thread to accommodate the new consumer
temporarily until some of the other tasks finish, at which point we
drop down to 16 threads again, but share them fairly)

I haven't read the whole e-mail discussion but I will read more of it
and make some other comments.

On Thu, Apr 30, 2020 at 3:17 PM David Li <li...@gmail.com> wrote:
>
> Francois,
>
> Thanks for the pointers. I'll see if I can put together a
> proof-of-concept, might that help discussion? I agree it would be good
> to make it format-agnostic. I'm also curious what thoughts you'd have
> on how to manage cross-file parallelism (coalescing only helps within
> a file). If we just naively start scanning fragments in parallel, we'd
> still want some way to help ensure the actual reads get issued roughly
> in order of file (to avoid the problem discussed above, where reads
> for file B prevent reads for file A from getting scheduled, where B
> follows A from the consumer's standpoint).
>
> Antoine,
>
> We would be interested in that as well. One thing we do want to
> investigate is a better ReadAsync() implementation for S3File as
> preliminary benchmarking on our side has shown it's quite inefficient
> (the default implementation makes lots of memcpy()s).
>
> Thanks,
> David
>
> On 4/30/20, Antoine Pitrou <an...@python.org> wrote:
> >
> > If we want to discuss IO APIs we should do that comprehensively.
> > There are various ways of expressing what we want to do (explicit
> > readahead, fadvise-like APIs, async APIs, etc.).
> >
> > Regards
> >
> > Antoine.
> >
> >
> > Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
> >> One more point,
> >>
> >> It would seem beneficial if we could express this in
> >> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
> >> buffering/coalescing would be needed. In the case of Parquet, we'd get
> >> the _exact_ ranges computed from the medata.This method would also
> >> possibly benefit other filesystems since on linux it can call
> >> `readahead` and/or `madvise`.
> >>
> >> François
> >>
> >>
> >> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
> >> <fs...@gmail.com> wrote:
> >>>
> >>> Hello David,
> >>>
> >>> I think that what you ask is achievable with the dataset API without
> >>> much effort. You'd have to insert the pre-buffering at
> >>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is
> >>> essentially a generator that looks like
> >>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
> >>> fragment in-order. The application consuming the ScanTask could
> >>> control the number of scheduled tasks by looking at the IO pool load.
> >>>
> >>> OTOH, It would be good if we could make this format agnostic, e.g.
> >>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
> >>> would be applicable to all formats, CSV, ipc, ...
> >>>
> >>> François
> >>> [1]
> >>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
> >>>
> >>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com> wrote:
> >>>>
> >>>> Sure, and we are still interested in collaborating. The main use case
> >>>> we have is scanning datasets in order of the partition key; it seems
> >>>> ordering is the only missing thing from Antoine's comments. However,
> >>>> from briefly playing around with the Python API, an application could
> >>>> manually order the fragments if so desired, so that still works for
> >>>> us, even if ordering isn't otherwise a guarantee.
> >>>>
> >>>> Performance-wise, we would want intra-file concurrency (coalescing)
> >>>> and inter-file concurrency (buffering files in order, as described in
> >>>> my previous messages). Even if Datasets doesn't directly handle this,
> >>>> it'd be ideal if an application could achieve this if it were willing
> >>>> to manage the details. I also vaguely remember seeing some interest in
> >>>> things like being able to distribute a computation over a dataset via
> >>>> Dask or some other distributed computation system, which would also be
> >>>> interesting to us, though not a concrete requirement.
> >>>>
> >>>> I'd like to reference the original proposal document, which has more
> >>>> detail on our workloads and use cases:
> >>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
> >>>> As described there, we have a library that implements both a
> >>>> datasets-like API (hand it a remote directory, get back an Arrow
> >>>> Table) and several optimizations to make that library perform
> >>>> acceptably. Our motivation here is to be able to have a path to
> >>>> migrate to using and contributing to Arrow Datasets, which we see as a
> >>>> cross-language, cross-filesystem library, without regressing in
> >>>> performance. (We are limited to Python and S3.)
> >>>>
> >>>> Best,
> >>>> David
> >>>>
> >>>> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
> >>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
> >>>>>> guaranteed to download all the files in order, but with more control,
> >>>>>> you can make this more likely. You can also prevent the case where
> >>>>>> due
> >>>>>> to scheduling, file N+1 doesn't even start downloading until after
> >>>>>> file N+2, which can happen if you just submit all reads to a thread
> >>>>>> pool, as demonstrated in the linked trace.
> >>>>>>
> >>>>>> And again, with this level of control, you can also decide to reduce
> >>>>>> or increase parallelism based on network conditions, memory usage,
> >>>>>> other readers, etc. So it is both about improving/smoothing out
> >>>>>> performance, and limiting resource consumption.
> >>>>>>
> >>>>>> Finally, I do not mean to propose that we necessarily build all of
> >>>>>> this into Arrow, just that it we would like to make it possible to
> >>>>>> build this with Arrow, and that Datasets may find this interesting
> >>>>>> for
> >>>>>> its optimization purposes, if concurrent reads are a goal.
> >>>>>>
> >>>>>>>  Except that datasets are essentially unordered.
> >>>>>>
> >>>>>> I did not realize this, but that means it's not really suitable for
> >>>>>> our use case, unfortunately.
> >>>>>
> >>>>> It would be helpful to understand things a bit better so that we do
> >>>>> not miss out on an opportunity to collaborate. I don't know that the
> >>>>> current mode of the some of the public Datasets APIs is a dogmatic
> >>>>> view about how everything should always work, and it's possible that
> >>>>> some relatively minor changes could allow you to use it. So let's try
> >>>>> not to be closing any doors right now
> >>>>>
> >>>>>> Thanks,
> >>>>>> David
> >>>>>>
> >>>>>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
> >>>>>>>
> >>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
> >>>>>>>> Sure -
> >>>>>>>>
> >>>>>>>> The use case is to read a large partitioned dataset, consisting of
> >>>>>>>> tens or hundreds of Parquet files. A reader expects to scan through
> >>>>>>>> the data in order of the partition key. However, to improve
> >>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N + k
> >>>>>>>> while the consumer is still reading file N, so that it doesn't have
> >>>>>>>> to
> >>>>>>>> wait every time it opens a new file, and to help hide any latency
> >>>>>>>> or
> >>>>>>>> slowness that might be happening on the backend. We also don't want
> >>>>>>>> to
> >>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't,
> >>>>>>>> because
> >>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
> >>>>>>>
> >>>>>>> But depending on network conditions, you may very well get file N+2
> >>>>>>> before N+1, even if you start loading it after...
> >>>>>>>
> >>>>>>>> This is why I mention the project is quite similar to the Datasets
> >>>>>>>> project - Datasets likely covers all the functionality we would
> >>>>>>>> eventually need.
> >>>>>>>
> >>>>>>> Except that datasets are essentially unordered.
> >>>>>>>
> >>>>>>> Regards
> >>>>>>>
> >>>>>>> Antoine.
> >>>>>>>
> >>>>>
> >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Francois,

Thanks for the pointers. I'll see if I can put together a
proof-of-concept, might that help discussion? I agree it would be good
to make it format-agnostic. I'm also curious what thoughts you'd have
on how to manage cross-file parallelism (coalescing only helps within
a file). If we just naively start scanning fragments in parallel, we'd
still want some way to help ensure the actual reads get issued roughly
in order of file (to avoid the problem discussed above, where reads
for file B prevent reads for file A from getting scheduled, where B
follows A from the consumer's standpoint).

Antoine,

We would be interested in that as well. One thing we do want to
investigate is a better ReadAsync() implementation for S3File as
preliminary benchmarking on our side has shown it's quite inefficient
(the default implementation makes lots of memcpy()s).

Thanks,
David

On 4/30/20, Antoine Pitrou <an...@python.org> wrote:
>
> If we want to discuss IO APIs we should do that comprehensively.
> There are various ways of expressing what we want to do (explicit
> readahead, fadvise-like APIs, async APIs, etc.).
>
> Regards
>
> Antoine.
>
>
> Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
>> One more point,
>>
>> It would seem beneficial if we could express this in
>> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
>> buffering/coalescing would be needed. In the case of Parquet, we'd get
>> the _exact_ ranges computed from the medata.This method would also
>> possibly benefit other filesystems since on linux it can call
>> `readahead` and/or `madvise`.
>>
>> François
>>
>>
>> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
>> <fs...@gmail.com> wrote:
>>>
>>> Hello David,
>>>
>>> I think that what you ask is achievable with the dataset API without
>>> much effort. You'd have to insert the pre-buffering at
>>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is
>>> essentially a generator that looks like
>>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
>>> fragment in-order. The application consuming the ScanTask could
>>> control the number of scheduled tasks by looking at the IO pool load.
>>>
>>> OTOH, It would be good if we could make this format agnostic, e.g.
>>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
>>> would be applicable to all formats, CSV, ipc, ...
>>>
>>> François
>>> [1]
>>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
>>>
>>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com> wrote:
>>>>
>>>> Sure, and we are still interested in collaborating. The main use case
>>>> we have is scanning datasets in order of the partition key; it seems
>>>> ordering is the only missing thing from Antoine's comments. However,
>>>> from briefly playing around with the Python API, an application could
>>>> manually order the fragments if so desired, so that still works for
>>>> us, even if ordering isn't otherwise a guarantee.
>>>>
>>>> Performance-wise, we would want intra-file concurrency (coalescing)
>>>> and inter-file concurrency (buffering files in order, as described in
>>>> my previous messages). Even if Datasets doesn't directly handle this,
>>>> it'd be ideal if an application could achieve this if it were willing
>>>> to manage the details. I also vaguely remember seeing some interest in
>>>> things like being able to distribute a computation over a dataset via
>>>> Dask or some other distributed computation system, which would also be
>>>> interesting to us, though not a concrete requirement.
>>>>
>>>> I'd like to reference the original proposal document, which has more
>>>> detail on our workloads and use cases:
>>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
>>>> As described there, we have a library that implements both a
>>>> datasets-like API (hand it a remote directory, get back an Arrow
>>>> Table) and several optimizations to make that library perform
>>>> acceptably. Our motivation here is to be able to have a path to
>>>> migrate to using and contributing to Arrow Datasets, which we see as a
>>>> cross-language, cross-filesystem library, without regressing in
>>>> performance. (We are limited to Python and S3.)
>>>>
>>>> Best,
>>>> David
>>>>
>>>> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
>>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
>>>>>> guaranteed to download all the files in order, but with more control,
>>>>>> you can make this more likely. You can also prevent the case where
>>>>>> due
>>>>>> to scheduling, file N+1 doesn't even start downloading until after
>>>>>> file N+2, which can happen if you just submit all reads to a thread
>>>>>> pool, as demonstrated in the linked trace.
>>>>>>
>>>>>> And again, with this level of control, you can also decide to reduce
>>>>>> or increase parallelism based on network conditions, memory usage,
>>>>>> other readers, etc. So it is both about improving/smoothing out
>>>>>> performance, and limiting resource consumption.
>>>>>>
>>>>>> Finally, I do not mean to propose that we necessarily build all of
>>>>>> this into Arrow, just that it we would like to make it possible to
>>>>>> build this with Arrow, and that Datasets may find this interesting
>>>>>> for
>>>>>> its optimization purposes, if concurrent reads are a goal.
>>>>>>
>>>>>>>  Except that datasets are essentially unordered.
>>>>>>
>>>>>> I did not realize this, but that means it's not really suitable for
>>>>>> our use case, unfortunately.
>>>>>
>>>>> It would be helpful to understand things a bit better so that we do
>>>>> not miss out on an opportunity to collaborate. I don't know that the
>>>>> current mode of the some of the public Datasets APIs is a dogmatic
>>>>> view about how everything should always work, and it's possible that
>>>>> some relatively minor changes could allow you to use it. So let's try
>>>>> not to be closing any doors right now
>>>>>
>>>>>> Thanks,
>>>>>> David
>>>>>>
>>>>>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>>>>>>>
>>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
>>>>>>>> Sure -
>>>>>>>>
>>>>>>>> The use case is to read a large partitioned dataset, consisting of
>>>>>>>> tens or hundreds of Parquet files. A reader expects to scan through
>>>>>>>> the data in order of the partition key. However, to improve
>>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N + k
>>>>>>>> while the consumer is still reading file N, so that it doesn't have
>>>>>>>> to
>>>>>>>> wait every time it opens a new file, and to help hide any latency
>>>>>>>> or
>>>>>>>> slowness that might be happening on the backend. We also don't want
>>>>>>>> to
>>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't,
>>>>>>>> because
>>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
>>>>>>>
>>>>>>> But depending on network conditions, you may very well get file N+2
>>>>>>> before N+1, even if you start loading it after...
>>>>>>>
>>>>>>>> This is why I mention the project is quite similar to the Datasets
>>>>>>>> project - Datasets likely covers all the functionality we would
>>>>>>>> eventually need.
>>>>>>>
>>>>>>> Except that datasets are essentially unordered.
>>>>>>>
>>>>>>> Regards
>>>>>>>
>>>>>>> Antoine.
>>>>>>>
>>>>>
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
If we want to discuss IO APIs we should do that comprehensively.
There are various ways of expressing what we want to do (explicit
readahead, fadvise-like APIs, async APIs, etc.).

Regards

Antoine.


Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
> One more point,
> 
> It would seem beneficial if we could express this in
> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
> buffering/coalescing would be needed. In the case of Parquet, we'd get
> the _exact_ ranges computed from the medata.This method would also
> possibly benefit other filesystems since on linux it can call
> `readahead` and/or `madvise`.
> 
> François
> 
> 
> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
> <fs...@gmail.com> wrote:
>>
>> Hello David,
>>
>> I think that what you ask is achievable with the dataset API without
>> much effort. You'd have to insert the pre-buffering at
>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is
>> essentially a generator that looks like
>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
>> fragment in-order. The application consuming the ScanTask could
>> control the number of scheduled tasks by looking at the IO pool load.
>>
>> OTOH, It would be good if we could make this format agnostic, e.g.
>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
>> would be applicable to all formats, CSV, ipc, ...
>>
>> François
>> [1] https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
>>
>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com> wrote:
>>>
>>> Sure, and we are still interested in collaborating. The main use case
>>> we have is scanning datasets in order of the partition key; it seems
>>> ordering is the only missing thing from Antoine's comments. However,
>>> from briefly playing around with the Python API, an application could
>>> manually order the fragments if so desired, so that still works for
>>> us, even if ordering isn't otherwise a guarantee.
>>>
>>> Performance-wise, we would want intra-file concurrency (coalescing)
>>> and inter-file concurrency (buffering files in order, as described in
>>> my previous messages). Even if Datasets doesn't directly handle this,
>>> it'd be ideal if an application could achieve this if it were willing
>>> to manage the details. I also vaguely remember seeing some interest in
>>> things like being able to distribute a computation over a dataset via
>>> Dask or some other distributed computation system, which would also be
>>> interesting to us, though not a concrete requirement.
>>>
>>> I'd like to reference the original proposal document, which has more
>>> detail on our workloads and use cases:
>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
>>> As described there, we have a library that implements both a
>>> datasets-like API (hand it a remote directory, get back an Arrow
>>> Table) and several optimizations to make that library perform
>>> acceptably. Our motivation here is to be able to have a path to
>>> migrate to using and contributing to Arrow Datasets, which we see as a
>>> cross-language, cross-filesystem library, without regressing in
>>> performance. (We are limited to Python and S3.)
>>>
>>> Best,
>>> David
>>>
>>> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com> wrote:
>>>>>
>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
>>>>> guaranteed to download all the files in order, but with more control,
>>>>> you can make this more likely. You can also prevent the case where due
>>>>> to scheduling, file N+1 doesn't even start downloading until after
>>>>> file N+2, which can happen if you just submit all reads to a thread
>>>>> pool, as demonstrated in the linked trace.
>>>>>
>>>>> And again, with this level of control, you can also decide to reduce
>>>>> or increase parallelism based on network conditions, memory usage,
>>>>> other readers, etc. So it is both about improving/smoothing out
>>>>> performance, and limiting resource consumption.
>>>>>
>>>>> Finally, I do not mean to propose that we necessarily build all of
>>>>> this into Arrow, just that it we would like to make it possible to
>>>>> build this with Arrow, and that Datasets may find this interesting for
>>>>> its optimization purposes, if concurrent reads are a goal.
>>>>>
>>>>>>  Except that datasets are essentially unordered.
>>>>>
>>>>> I did not realize this, but that means it's not really suitable for
>>>>> our use case, unfortunately.
>>>>
>>>> It would be helpful to understand things a bit better so that we do
>>>> not miss out on an opportunity to collaborate. I don't know that the
>>>> current mode of the some of the public Datasets APIs is a dogmatic
>>>> view about how everything should always work, and it's possible that
>>>> some relatively minor changes could allow you to use it. So let's try
>>>> not to be closing any doors right now
>>>>
>>>>> Thanks,
>>>>> David
>>>>>
>>>>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>>>>>>
>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
>>>>>>> Sure -
>>>>>>>
>>>>>>> The use case is to read a large partitioned dataset, consisting of
>>>>>>> tens or hundreds of Parquet files. A reader expects to scan through
>>>>>>> the data in order of the partition key. However, to improve
>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N + k
>>>>>>> while the consumer is still reading file N, so that it doesn't have to
>>>>>>> wait every time it opens a new file, and to help hide any latency or
>>>>>>> slowness that might be happening on the backend. We also don't want to
>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't, because
>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
>>>>>>
>>>>>> But depending on network conditions, you may very well get file N+2
>>>>>> before N+1, even if you start loading it after...
>>>>>>
>>>>>>> This is why I mention the project is quite similar to the Datasets
>>>>>>> project - Datasets likely covers all the functionality we would
>>>>>>> eventually need.
>>>>>>
>>>>>> Except that datasets are essentially unordered.
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>> Antoine.
>>>>>>
>>>>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Francois Saint-Jacques <fs...@gmail.com>.
One more point,

It would seem beneficial if we could express this in
`RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
buffering/coalescing would be needed. In the case of Parquet, we'd get
the _exact_ ranges computed from the medata.This method would also
possibly benefit other filesystems since on linux it can call
`readahead` and/or `madvise`.

François


On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
<fs...@gmail.com> wrote:
>
> Hello David,
>
> I think that what you ask is achievable with the dataset API without
> much effort. You'd have to insert the pre-buffering at
> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is
> essentially a generator that looks like
> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
> fragment in-order. The application consuming the ScanTask could
> control the number of scheduled tasks by looking at the IO pool load.
>
> OTOH, It would be good if we could make this format agnostic, e.g.
> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
> would be applicable to all formats, CSV, ipc, ...
>
> François
> [1] https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
>
> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com> wrote:
> >
> > Sure, and we are still interested in collaborating. The main use case
> > we have is scanning datasets in order of the partition key; it seems
> > ordering is the only missing thing from Antoine's comments. However,
> > from briefly playing around with the Python API, an application could
> > manually order the fragments if so desired, so that still works for
> > us, even if ordering isn't otherwise a guarantee.
> >
> > Performance-wise, we would want intra-file concurrency (coalescing)
> > and inter-file concurrency (buffering files in order, as described in
> > my previous messages). Even if Datasets doesn't directly handle this,
> > it'd be ideal if an application could achieve this if it were willing
> > to manage the details. I also vaguely remember seeing some interest in
> > things like being able to distribute a computation over a dataset via
> > Dask or some other distributed computation system, which would also be
> > interesting to us, though not a concrete requirement.
> >
> > I'd like to reference the original proposal document, which has more
> > detail on our workloads and use cases:
> > https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
> > As described there, we have a library that implements both a
> > datasets-like API (hand it a remote directory, get back an Arrow
> > Table) and several optimizations to make that library perform
> > acceptably. Our motivation here is to be able to have a path to
> > migrate to using and contributing to Arrow Datasets, which we see as a
> > cross-language, cross-filesystem library, without regressing in
> > performance. (We are limited to Python and S3.)
> >
> > Best,
> > David
> >
> > On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
> > > On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com> wrote:
> > >>
> > >> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
> > >> guaranteed to download all the files in order, but with more control,
> > >> you can make this more likely. You can also prevent the case where due
> > >> to scheduling, file N+1 doesn't even start downloading until after
> > >> file N+2, which can happen if you just submit all reads to a thread
> > >> pool, as demonstrated in the linked trace.
> > >>
> > >> And again, with this level of control, you can also decide to reduce
> > >> or increase parallelism based on network conditions, memory usage,
> > >> other readers, etc. So it is both about improving/smoothing out
> > >> performance, and limiting resource consumption.
> > >>
> > >> Finally, I do not mean to propose that we necessarily build all of
> > >> this into Arrow, just that it we would like to make it possible to
> > >> build this with Arrow, and that Datasets may find this interesting for
> > >> its optimization purposes, if concurrent reads are a goal.
> > >>
> > >> >  Except that datasets are essentially unordered.
> > >>
> > >> I did not realize this, but that means it's not really suitable for
> > >> our use case, unfortunately.
> > >
> > > It would be helpful to understand things a bit better so that we do
> > > not miss out on an opportunity to collaborate. I don't know that the
> > > current mode of the some of the public Datasets APIs is a dogmatic
> > > view about how everything should always work, and it's possible that
> > > some relatively minor changes could allow you to use it. So let's try
> > > not to be closing any doors right now
> > >
> > >> Thanks,
> > >> David
> > >>
> > >> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
> > >> >
> > >> > Le 29/04/2020 à 23:30, David Li a écrit :
> > >> >> Sure -
> > >> >>
> > >> >> The use case is to read a large partitioned dataset, consisting of
> > >> >> tens or hundreds of Parquet files. A reader expects to scan through
> > >> >> the data in order of the partition key. However, to improve
> > >> >> performance, we'd like to begin loading files N+1, N+2, ... N + k
> > >> >> while the consumer is still reading file N, so that it doesn't have to
> > >> >> wait every time it opens a new file, and to help hide any latency or
> > >> >> slowness that might be happening on the backend. We also don't want to
> > >> >> be in a situation where file N+2 is ready but file N+1 isn't, because
> > >> >> that doesn't help us (we still have to wait for N+1 to load).
> > >> >
> > >> > But depending on network conditions, you may very well get file N+2
> > >> > before N+1, even if you start loading it after...
> > >> >
> > >> >> This is why I mention the project is quite similar to the Datasets
> > >> >> project - Datasets likely covers all the functionality we would
> > >> >> eventually need.
> > >> >
> > >> > Except that datasets are essentially unordered.
> > >> >
> > >> > Regards
> > >> >
> > >> > Antoine.
> > >> >
> > >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Francois Saint-Jacques <fs...@gmail.com>.
Hello David,

I think that what you ask is achievable with the dataset API without
much effort. You'd have to insert the pre-buffering at
ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is
essentially a generator that looks like
flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
fragment in-order. The application consuming the ScanTask could
control the number of scheduled tasks by looking at the IO pool load.

OTOH, It would be good if we could make this format agnostic, e.g.
offer this via a ScanOptions toggle, e.g. "readahead_files" and this
would be applicable to all formats, CSV, ipc, ...

François
[1] https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401

On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com> wrote:
>
> Sure, and we are still interested in collaborating. The main use case
> we have is scanning datasets in order of the partition key; it seems
> ordering is the only missing thing from Antoine's comments. However,
> from briefly playing around with the Python API, an application could
> manually order the fragments if so desired, so that still works for
> us, even if ordering isn't otherwise a guarantee.
>
> Performance-wise, we would want intra-file concurrency (coalescing)
> and inter-file concurrency (buffering files in order, as described in
> my previous messages). Even if Datasets doesn't directly handle this,
> it'd be ideal if an application could achieve this if it were willing
> to manage the details. I also vaguely remember seeing some interest in
> things like being able to distribute a computation over a dataset via
> Dask or some other distributed computation system, which would also be
> interesting to us, though not a concrete requirement.
>
> I'd like to reference the original proposal document, which has more
> detail on our workloads and use cases:
> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
> As described there, we have a library that implements both a
> datasets-like API (hand it a remote directory, get back an Arrow
> Table) and several optimizations to make that library perform
> acceptably. Our motivation here is to be able to have a path to
> migrate to using and contributing to Arrow Datasets, which we see as a
> cross-language, cross-filesystem library, without regressing in
> performance. (We are limited to Python and S3.)
>
> Best,
> David
>
> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
> > On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com> wrote:
> >>
> >> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
> >> guaranteed to download all the files in order, but with more control,
> >> you can make this more likely. You can also prevent the case where due
> >> to scheduling, file N+1 doesn't even start downloading until after
> >> file N+2, which can happen if you just submit all reads to a thread
> >> pool, as demonstrated in the linked trace.
> >>
> >> And again, with this level of control, you can also decide to reduce
> >> or increase parallelism based on network conditions, memory usage,
> >> other readers, etc. So it is both about improving/smoothing out
> >> performance, and limiting resource consumption.
> >>
> >> Finally, I do not mean to propose that we necessarily build all of
> >> this into Arrow, just that it we would like to make it possible to
> >> build this with Arrow, and that Datasets may find this interesting for
> >> its optimization purposes, if concurrent reads are a goal.
> >>
> >> >  Except that datasets are essentially unordered.
> >>
> >> I did not realize this, but that means it's not really suitable for
> >> our use case, unfortunately.
> >
> > It would be helpful to understand things a bit better so that we do
> > not miss out on an opportunity to collaborate. I don't know that the
> > current mode of the some of the public Datasets APIs is a dogmatic
> > view about how everything should always work, and it's possible that
> > some relatively minor changes could allow you to use it. So let's try
> > not to be closing any doors right now
> >
> >> Thanks,
> >> David
> >>
> >> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
> >> >
> >> > Le 29/04/2020 à 23:30, David Li a écrit :
> >> >> Sure -
> >> >>
> >> >> The use case is to read a large partitioned dataset, consisting of
> >> >> tens or hundreds of Parquet files. A reader expects to scan through
> >> >> the data in order of the partition key. However, to improve
> >> >> performance, we'd like to begin loading files N+1, N+2, ... N + k
> >> >> while the consumer is still reading file N, so that it doesn't have to
> >> >> wait every time it opens a new file, and to help hide any latency or
> >> >> slowness that might be happening on the backend. We also don't want to
> >> >> be in a situation where file N+2 is ready but file N+1 isn't, because
> >> >> that doesn't help us (we still have to wait for N+1 to load).
> >> >
> >> > But depending on network conditions, you may very well get file N+2
> >> > before N+1, even if you start loading it after...
> >> >
> >> >> This is why I mention the project is quite similar to the Datasets
> >> >> project - Datasets likely covers all the functionality we would
> >> >> eventually need.
> >> >
> >> > Except that datasets are essentially unordered.
> >> >
> >> > Regards
> >> >
> >> > Antoine.
> >> >
> >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Sure, and we are still interested in collaborating. The main use case
we have is scanning datasets in order of the partition key; it seems
ordering is the only missing thing from Antoine's comments. However,
from briefly playing around with the Python API, an application could
manually order the fragments if so desired, so that still works for
us, even if ordering isn't otherwise a guarantee.

Performance-wise, we would want intra-file concurrency (coalescing)
and inter-file concurrency (buffering files in order, as described in
my previous messages). Even if Datasets doesn't directly handle this,
it'd be ideal if an application could achieve this if it were willing
to manage the details. I also vaguely remember seeing some interest in
things like being able to distribute a computation over a dataset via
Dask or some other distributed computation system, which would also be
interesting to us, though not a concrete requirement.

I'd like to reference the original proposal document, which has more
detail on our workloads and use cases:
https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
As described there, we have a library that implements both a
datasets-like API (hand it a remote directory, get back an Arrow
Table) and several optimizations to make that library perform
acceptably. Our motivation here is to be able to have a path to
migrate to using and contributing to Arrow Datasets, which we see as a
cross-language, cross-filesystem library, without regressing in
performance. (We are limited to Python and S3.)

Best,
David

On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com> wrote:
>>
>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
>> guaranteed to download all the files in order, but with more control,
>> you can make this more likely. You can also prevent the case where due
>> to scheduling, file N+1 doesn't even start downloading until after
>> file N+2, which can happen if you just submit all reads to a thread
>> pool, as demonstrated in the linked trace.
>>
>> And again, with this level of control, you can also decide to reduce
>> or increase parallelism based on network conditions, memory usage,
>> other readers, etc. So it is both about improving/smoothing out
>> performance, and limiting resource consumption.
>>
>> Finally, I do not mean to propose that we necessarily build all of
>> this into Arrow, just that it we would like to make it possible to
>> build this with Arrow, and that Datasets may find this interesting for
>> its optimization purposes, if concurrent reads are a goal.
>>
>> >  Except that datasets are essentially unordered.
>>
>> I did not realize this, but that means it's not really suitable for
>> our use case, unfortunately.
>
> It would be helpful to understand things a bit better so that we do
> not miss out on an opportunity to collaborate. I don't know that the
> current mode of the some of the public Datasets APIs is a dogmatic
> view about how everything should always work, and it's possible that
> some relatively minor changes could allow you to use it. So let's try
> not to be closing any doors right now
>
>> Thanks,
>> David
>>
>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>> >
>> > Le 29/04/2020 à 23:30, David Li a écrit :
>> >> Sure -
>> >>
>> >> The use case is to read a large partitioned dataset, consisting of
>> >> tens or hundreds of Parquet files. A reader expects to scan through
>> >> the data in order of the partition key. However, to improve
>> >> performance, we'd like to begin loading files N+1, N+2, ... N + k
>> >> while the consumer is still reading file N, so that it doesn't have to
>> >> wait every time it opens a new file, and to help hide any latency or
>> >> slowness that might be happening on the backend. We also don't want to
>> >> be in a situation where file N+2 is ready but file N+1 isn't, because
>> >> that doesn't help us (we still have to wait for N+1 to load).
>> >
>> > But depending on network conditions, you may very well get file N+2
>> > before N+1, even if you start loading it after...
>> >
>> >> This is why I mention the project is quite similar to the Datasets
>> >> project - Datasets likely covers all the functionality we would
>> >> eventually need.
>> >
>> > Except that datasets are essentially unordered.
>> >
>> > Regards
>> >
>> > Antoine.
>> >
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Joris Van den Bossche <jo...@gmail.com>.
On Thu, 30 Apr 2020 at 04:06, Wes McKinney <we...@gmail.com> wrote:

> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com> wrote:
> >
> > Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
> > guaranteed to download all the files in order, but with more control,
> > you can make this more likely. You can also prevent the case where due
> > to scheduling, file N+1 doesn't even start downloading until after
> > file N+2, which can happen if you just submit all reads to a thread
> > pool, as demonstrated in the linked trace.
> >
> > And again, with this level of control, you can also decide to reduce
> > or increase parallelism based on network conditions, memory usage,
> > other readers, etc. So it is both about improving/smoothing out
> > performance, and limiting resource consumption.
> >
> > Finally, I do not mean to propose that we necessarily build all of
> > this into Arrow, just that it we would like to make it possible to
> > build this with Arrow, and that Datasets may find this interesting for
> > its optimization purposes, if concurrent reads are a goal.
> >
> > >  Except that datasets are essentially unordered.
> >
> > I did not realize this, but that means it's not really suitable for
> > our use case, unfortunately.
>
> It would be helpful to understand things a bit better so that we do
> not miss out on an opportunity to collaborate. I don't know that the
> current mode of the some of the public Datasets APIs is a dogmatic
> view about how everything should always work, and it's possible that
> some relatively minor changes could allow you to use it. So let's try
> not to be closing any doors right now
>

Note that a Dataset itself is actually ordered, AFAIK. Meaning: the list of
Fragments it is composed of is an ordered vector. It's only when eg
consuming scan tasks that the result might not be ordered (this is
currently the case in ToTable, but see
https://issues.apache.org/jira/browse/ARROW-8447 for an issue about
potentially changing this).


> > Thanks,
> > David
> >
> > On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
> > >
> > > Le 29/04/2020 à 23:30, David Li a écrit :
> > >> Sure -
> > >>
> > >> The use case is to read a large partitioned dataset, consisting of
> > >> tens or hundreds of Parquet files. A reader expects to scan through
> > >> the data in order of the partition key. However, to improve
> > >> performance, we'd like to begin loading files N+1, N+2, ... N + k
> > >> while the consumer is still reading file N, so that it doesn't have to
> > >> wait every time it opens a new file, and to help hide any latency or
> > >> slowness that might be happening on the backend. We also don't want to
> > >> be in a situation where file N+2 is ready but file N+1 isn't, because
> > >> that doesn't help us (we still have to wait for N+1 to load).
> > >
> > > But depending on network conditions, you may very well get file N+2
> > > before N+1, even if you start loading it after...
> > >
> > >> This is why I mention the project is quite similar to the Datasets
> > >> project - Datasets likely covers all the functionality we would
> > >> eventually need.
> > >
> > > Except that datasets are essentially unordered.
> > >
> > > Regards
> > >
> > > Antoine.
> > >
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com> wrote:
>
> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
> guaranteed to download all the files in order, but with more control,
> you can make this more likely. You can also prevent the case where due
> to scheduling, file N+1 doesn't even start downloading until after
> file N+2, which can happen if you just submit all reads to a thread
> pool, as demonstrated in the linked trace.
>
> And again, with this level of control, you can also decide to reduce
> or increase parallelism based on network conditions, memory usage,
> other readers, etc. So it is both about improving/smoothing out
> performance, and limiting resource consumption.
>
> Finally, I do not mean to propose that we necessarily build all of
> this into Arrow, just that it we would like to make it possible to
> build this with Arrow, and that Datasets may find this interesting for
> its optimization purposes, if concurrent reads are a goal.
>
> >  Except that datasets are essentially unordered.
>
> I did not realize this, but that means it's not really suitable for
> our use case, unfortunately.

It would be helpful to understand things a bit better so that we do
not miss out on an opportunity to collaborate. I don't know that the
current mode of the some of the public Datasets APIs is a dogmatic
view about how everything should always work, and it's possible that
some relatively minor changes could allow you to use it. So let's try
not to be closing any doors right now

> Thanks,
> David
>
> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
> >
> > Le 29/04/2020 à 23:30, David Li a écrit :
> >> Sure -
> >>
> >> The use case is to read a large partitioned dataset, consisting of
> >> tens or hundreds of Parquet files. A reader expects to scan through
> >> the data in order of the partition key. However, to improve
> >> performance, we'd like to begin loading files N+1, N+2, ... N + k
> >> while the consumer is still reading file N, so that it doesn't have to
> >> wait every time it opens a new file, and to help hide any latency or
> >> slowness that might be happening on the backend. We also don't want to
> >> be in a situation where file N+2 is ready but file N+1 isn't, because
> >> that doesn't help us (we still have to wait for N+1 to load).
> >
> > But depending on network conditions, you may very well get file N+2
> > before N+1, even if you start loading it after...
> >
> >> This is why I mention the project is quite similar to the Datasets
> >> project - Datasets likely covers all the functionality we would
> >> eventually need.
> >
> > Except that datasets are essentially unordered.
> >
> > Regards
> >
> > Antoine.
> >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
guaranteed to download all the files in order, but with more control,
you can make this more likely. You can also prevent the case where due
to scheduling, file N+1 doesn't even start downloading until after
file N+2, which can happen if you just submit all reads to a thread
pool, as demonstrated in the linked trace.

And again, with this level of control, you can also decide to reduce
or increase parallelism based on network conditions, memory usage,
other readers, etc. So it is both about improving/smoothing out
performance, and limiting resource consumption.

Finally, I do not mean to propose that we necessarily build all of
this into Arrow, just that it we would like to make it possible to
build this with Arrow, and that Datasets may find this interesting for
its optimization purposes, if concurrent reads are a goal.

>  Except that datasets are essentially unordered.

I did not realize this, but that means it's not really suitable for
our use case, unfortunately.

Thanks,
David

On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>
> Le 29/04/2020 à 23:30, David Li a écrit :
>> Sure -
>>
>> The use case is to read a large partitioned dataset, consisting of
>> tens or hundreds of Parquet files. A reader expects to scan through
>> the data in order of the partition key. However, to improve
>> performance, we'd like to begin loading files N+1, N+2, ... N + k
>> while the consumer is still reading file N, so that it doesn't have to
>> wait every time it opens a new file, and to help hide any latency or
>> slowness that might be happening on the backend. We also don't want to
>> be in a situation where file N+2 is ready but file N+1 isn't, because
>> that doesn't help us (we still have to wait for N+1 to load).
>
> But depending on network conditions, you may very well get file N+2
> before N+1, even if you start loading it after...
>
>> This is why I mention the project is quite similar to the Datasets
>> project - Datasets likely covers all the functionality we would
>> eventually need.
>
> Except that datasets are essentially unordered.
>
> Regards
>
> Antoine.
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
Le 29/04/2020 à 23:30, David Li a écrit :
> Sure -
> 
> The use case is to read a large partitioned dataset, consisting of
> tens or hundreds of Parquet files. A reader expects to scan through
> the data in order of the partition key. However, to improve
> performance, we'd like to begin loading files N+1, N+2, ... N + k
> while the consumer is still reading file N, so that it doesn't have to
> wait every time it opens a new file, and to help hide any latency or
> slowness that might be happening on the backend. We also don't want to
> be in a situation where file N+2 is ready but file N+1 isn't, because
> that doesn't help us (we still have to wait for N+1 to load).

But depending on network conditions, you may very well get file N+2
before N+1, even if you start loading it after...

> This is why I mention the project is quite similar to the Datasets
> project - Datasets likely covers all the functionality we would
> eventually need.

Except that datasets are essentially unordered.

Regards

Antoine.

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Sure -

The use case is to read a large partitioned dataset, consisting of
tens or hundreds of Parquet files. A reader expects to scan through
the data in order of the partition key. However, to improve
performance, we'd like to begin loading files N+1, N+2, ... N + k
while the consumer is still reading file N, so that it doesn't have to
wait every time it opens a new file, and to help hide any latency or
slowness that might be happening on the backend. We also don't want to
be in a situation where file N+2 is ready but file N+1 isn't, because
that doesn't help us (we still have to wait for N+1 to load).

This is why I mention the project is quite similar to the Datasets
project - Datasets likely covers all the functionality we would
eventually need.

Best,
David

On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>
> Le 29/04/2020 à 20:49, David Li a écrit :
>>
>> However, we noticed this doesn’t actually bring us the expected
>> benefits. Consider files A, B, and C being buffered in parallel; right
>> now, all I/O goes through an internal I/O pool, and so several
>> operations for each of the three files get added to the pool. However,
>> they get serviced in some random order, and so it’s possible for file
>> C to finish all its I/O operations before file B can. Then, a consumer
>> is unnecessarily stuck waiting for those to complete.
>
> It would be good if you explained your use case a bit more precisely.
> Are you expecting the files to be read in a particular order?  If so, why?
>
> Regards
>
> Antoine.
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
Le 29/04/2020 à 20:49, David Li a écrit :
> 
> However, we noticed this doesn’t actually bring us the expected
> benefits. Consider files A, B, and C being buffered in parallel; right
> now, all I/O goes through an internal I/O pool, and so several
> operations for each of the three files get added to the pool. However,
> they get serviced in some random order, and so it’s possible for file
> C to finish all its I/O operations before file B can. Then, a consumer
> is unnecessarily stuck waiting for those to complete.

It would be good if you explained your use case a bit more precisely.
Are you expecting the files to be read in a particular order?  If so, why?

Regards

Antoine.