You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by David Li <li...@gmail.com> on 2020/02/05 13:32:07 UTC

[Discuss] Proposal for optimizing Datasets over S3/object storage

Hello all,

We've been following the Arrow Datasets project with great interest,
especially as we have an in-house library with similar goals built on
top of PyArrow. Recently, we noticed some discussion around optimizing
I/O for such use cases (e.g. PARQUET-1698), which is also where we had
focused our efforts.

Our long-term goal has been to open-source our library. However, our
code is in Python, but it would be most useful to everyone in the C++
core, so that R, Python, Ruby, etc. could benefit. Thus, we'd like to
share our high-level design, and offer to work with the community on
the implementation - at the very least, to avoid duplicating work.
We've summarized our approach, and hope this can start a discussion on
how to integrate such optimizations into Datasets:
https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit#

At a high level, we have three main optimizations:
- Given a set of columns to read, and potentially a filter on a
partition key, we can use Parquet metadata to compute exact byte
ranges to read from remote storage, and coalesce/split up reads as
necessary based on the characteristics of the storage platform.
- Given byte ranges to read, we can read them in parallel, using a
global thread pool and concurrency manager to limit parallelism and
resource consumption.
- By working at the level of a dataset, we can parallelize these
operations across files, and pipeline steps like reading Parquet
metadata with reading and deserialization.

We focus on Parquet and S3/object storage here, but these concepts
apply to other file formats and storage systems.

The main questions here are whether we think the optimizations are
useful for Arrow Datasets, and if so, how the API design and
implementation would proceed - I'd appreciate any feedback on the
approach here and potential API.

David

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
On Thu, Feb 6, 2020, 12:41 PM Antoine Pitrou <an...@python.org> wrote:

>
> Le 06/02/2020 à 19:37, Wes McKinney a écrit :
> > On Thu, Feb 6, 2020, 12:12 PM Antoine Pitrou <an...@python.org> wrote:
> >
> >> Le 06/02/2020 à 16:26, Wes McKinney a écrit :
> >>>
> >>> This seems useful, too. It becomes a question of where do you want to
> >>> manage the cached memory segments, however you obtain them. I'm
> >>> arguing that we should not have much custom code in the Parquet
> >>> library to manage the prefetched segments (and providing the correct
> >>> buffer slice to each column reader when they need it), and instead
> >>> encapsulate this logic so it can be reused.
> >>
> >> I see, so RandomAccessFile would have some associative caching logic to
> >> find whether the exact requested range was cached and then return it to
> >> the caller?  That sounds doable.  How is lifetime handled then?  Are
> >> cached buffers kept on the RandomAccessFile until they are requested, at
> >> which point their ownership is transferred to the caller?
> >>
> >
> > This seems like too much to try to build into RandomAccessFile. I would
> > suggest a class that wraps a random access file and manages cached
> segments
> > and their lifetimes through explicit APIs.
>
> So Parquet would expect to receive that class rather than
> RandomAccessFile?  Or it would grow separate paths for it?
>

If the user opts in to coalesced prefetching then the RowGroupReader would
instantiate the wrapper under the hood. Public APIs (aside from new APIs in
ReaderProperties for prefetching) would be unchanged.



>
>
>
> Regards
>
> Antoine.
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
hi David,

Yes, this sounds right to me. I would say that we should come up with
the public API for column prebuffering ASAP and then get to work on
implementing it and working to maximize the throughput.

- Wes

On Wed, Mar 18, 2020 at 11:37 AM David Li <li...@gmail.com> wrote:
>
> Hi all,
>
> Thanks to Antoine for implementing the core read coalescing logic.
>
> We've taken a look at what else needs to be done to get this working,
> and it sounds like the following changes would be worthwhile,
> independent of the rest of the optimizations we discussed:
>
> - Add benchmarks of the current Parquet reader with the current S3File
> (and other file implementations) so we can track
> improvements/regressions
> - Use the coalescing inside the Parquet reader (even without a column
> filter hint - this would subsume PARQUET-1698)
> - In coalescing, split large read ranges into smaller ones (this would
> further improve on PARQUET-1698 by taking advantage of parallel reads)
> - Accept a column filter hint in the Parquet reader and use that to
> compute read ranges
>
> Does this sound reasonable?
>
> Thanks,
> David
>
> On 2/6/20, David Li <li...@gmail.com> wrote:
> > Catching up on questions here...
> >
> >> Typically you can solve this by having enough IO concurrency at once :-)
> >> I'm not sure having sophisticated global coordination (based on which
> >> algorithms) would bring anything.  Would you care to elaborate?
> >
> > We aren't proposing *sophisticated* global coordination, rather, just
> > using a global pool with a global limit, so that a user doesn't
> > unintentionally start hundreds of requests in parallel, and so that
> > you can adjust the resource consumption/performance tradeoff.
> >
> > Essentially, what our library does is maintain two pools (for I/O):
> > - One pool produces I/O requests, by going through the list of files,
> > fetching the Parquet footers, and queuing up I/O requests on the main
> > pool. (This uses a pool so we can fetch and parse metadata from
> > multiple Parquet files at once.)
> > - One pool serves I/O requests, by fetching chunks and placing them in
> > buffers inside the file object implementation.
> >
> > The global concurrency manager additionally limits the second pool by
> > not servicing I/O requests for a file until all of the I/O requests
> > for previous files have at least started. (By just having lots of
> > concurrency, you might end up starving yourself by reading data you
> > don't want quite yet.)
> >
> > Additionally, the global pool could still be a win for non-Parquet
> > files - an implementation can at least submit, say, an entire CSV file
> > as a "chunk" and have it read in the background.
> >
> >> Actually, on a more high-level basis, is the goal to prefetch for
> >> sequential consumption of row groups?
> >
> > At least for us, our query pattern is to sequentially consume row
> > groups from a large dataset, where we select a subset of columns and a
> > subset of the partition key range (usually time range). Prefetching
> > speeds this up substantially, or in general, pipelining discovery of
> > files, I/O, and deserialization.
> >
> >> There are no situations where you would want to consume a scattered
> >> subset of row groups (e.g. predicate pushdown)?
> >
> > With coalescing, this "automatically" gets optimized. If you happen to
> > need column chunks from separate row groups that are adjacent or close
> > on-disk, coalescing will still fetch them in a single IO call.
> >
> > We found that having large row groups was more beneficial than small
> > row groups, since when you combine small row groups with column
> > selection, you end up with a lot of small non-adjacent column chunks -
> > which coalescing can't help with. The exact tradeoff depends on the
> > dataset and workload, of course.
> >
> >> This seems like too much to try to build into RandomAccessFile. I would
> >> suggest a class that wraps a random access file and manages cached
> >> segments
> >> and their lifetimes through explicit APIs.
> >
> > A wrapper class seems ideal, especially as the logic is agnostic to
> > the storage backend (except for some parameters which can either be
> > hand-tuned or estimated on the fly). It also keeps the scope of the
> > changes down.
> >
> >> Where to put the "async multiple range request" API is a separate
> >> question,
> >> though. Probably makes sense to start writing some working code and sort
> >> it
> >> out there.
> >
> > We haven't looked in this direction much. Our designs are based around
> > thread pools partly because we wanted to avoid modifying the Parquet
> > and Arrow internals, instead choosing to modify the I/O layer to "keep
> > Parquet fed" as quickly as possible.
> >
> > Overall, I recall there's an issue open for async APIs in
> > Arrow...perhaps we want to move that to a separate discussion, or on
> > the contrary, explore some experimental APIs here to inform the
> > overall design.
> >
> > Thanks,
> > David
> >
> > On 2/6/20, Wes McKinney <we...@gmail.com> wrote:
> >> On Thu, Feb 6, 2020 at 1:30 PM Antoine Pitrou <an...@python.org> wrote:
> >>>
> >>>
> >>> Le 06/02/2020 à 20:20, Wes McKinney a écrit :
> >>> >> Actually, on a more high-level basis, is the goal to prefetch for
> >>> >> sequential consumption of row groups?
> >>> >>
> >>> >
> >>> > Essentially yes. One "easy" optimization is to prefetch the entire
> >>> > serialized row group. This is an evolution of that idea where we want
> >>> > to
> >>> > prefetch only the needed parts of a row group in a minimum number of
> >>> > IO
> >>> > calls (consider reading the first 10 columns from a file with 1000
> >>> > columns
> >>> > -- so we want to do one IO call instead of 10 like we do now).
> >>>
> >>> There are no situations where you would want to consume a scattered
> >>> subset of row groups (e.g. predicate pushdown)?
> >>
> >> There are. If it can be demonstrated that there are performance gains
> >> resulting from IO optimizations involving multiple row groups then I
> >> see no reason not to implement them.
> >>
> >>> Regards
> >>>
> >>> Antoine.
> >>
> >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
(Apologies for the double-email)

In the original coalescing PR, an "AsyncContext" abstraction was
discussed. I could imagine being able to hold arbitrary
attributes/metrics for tasks that a scheduler could then take into
account, while making it easier for applications to thread through all
the different custom attributes they care about without having to add
parameters all over Arrow.

Best,
David

On 5/1/20, David Li <li...@gmail.com> wrote:
> Wes,
>
> From the outline there it seems like a path forward. I think the mode
> that would be helpful here is some sort of simple ordering/dependency
> so that the scheduler knows not to schedule subtasks of B until all
> subtasks of A have started (but not necessarily finished).
>
> I think the other part would be sizing the pool/number of concurrent
> tasks not just by number of tasks, but by some other metric (e.g. if a
> user attached a memory usage or bandwidth usage number to each task,
> and asked the scheduler to try to stay below some threshold).
>
> Ideally if both were somewhat application-pluggable (e.g. Datasets
> attaches an ordering to tasks and the coalescer attaches a memory
> usage metric, but it's up to the application to provide a scheduler
> that actually accounts for those), that would let applications do
> whatever wonky logic they need and limit the amount of bespoke things
> maintained in Arrow, at least until it's clear what sorts of
> strategies are commonly needed.
>
> Thanks,
> David
>
> On 5/1/20, Wes McKinney <we...@gmail.com> wrote:
>> I just wrote up a ticket about a general purpose multi-consumer
>> scheduler API, do you think this could be the beginning of a
>> resolution?
>>
>> https://issues.apache.org/jira/browse/ARROW-8667
>>
>> We may also want to design in some affordances so that no consumer is
>> ever 100% blocked, even if that causes temporary thread
>> oversubscription (e.g. if we have a 16-thread underlying thread pool,
>> and they're all being used by one consumer, then a new consumer shows
>> up, we spawn a 17-th thread to accommodate the new consumer
>> temporarily until some of the other tasks finish, at which point we
>> drop down to 16 threads again, but share them fairly)
>>
>> I haven't read the whole e-mail discussion but I will read more of it
>> and make some other comments.
>>
>> On Thu, Apr 30, 2020 at 3:17 PM David Li <li...@gmail.com> wrote:
>>>
>>> Francois,
>>>
>>> Thanks for the pointers. I'll see if I can put together a
>>> proof-of-concept, might that help discussion? I agree it would be good
>>> to make it format-agnostic. I'm also curious what thoughts you'd have
>>> on how to manage cross-file parallelism (coalescing only helps within
>>> a file). If we just naively start scanning fragments in parallel, we'd
>>> still want some way to help ensure the actual reads get issued roughly
>>> in order of file (to avoid the problem discussed above, where reads
>>> for file B prevent reads for file A from getting scheduled, where B
>>> follows A from the consumer's standpoint).
>>>
>>> Antoine,
>>>
>>> We would be interested in that as well. One thing we do want to
>>> investigate is a better ReadAsync() implementation for S3File as
>>> preliminary benchmarking on our side has shown it's quite inefficient
>>> (the default implementation makes lots of memcpy()s).
>>>
>>> Thanks,
>>> David
>>>
>>> On 4/30/20, Antoine Pitrou <an...@python.org> wrote:
>>> >
>>> > If we want to discuss IO APIs we should do that comprehensively.
>>> > There are various ways of expressing what we want to do (explicit
>>> > readahead, fadvise-like APIs, async APIs, etc.).
>>> >
>>> > Regards
>>> >
>>> > Antoine.
>>> >
>>> >
>>> > Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
>>> >> One more point,
>>> >>
>>> >> It would seem beneficial if we could express this in
>>> >> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
>>> >> buffering/coalescing would be needed. In the case of Parquet, we'd
>>> >> get
>>> >> the _exact_ ranges computed from the medata.This method would also
>>> >> possibly benefit other filesystems since on linux it can call
>>> >> `readahead` and/or `madvise`.
>>> >>
>>> >> François
>>> >>
>>> >>
>>> >> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
>>> >> <fs...@gmail.com> wrote:
>>> >>>
>>> >>> Hello David,
>>> >>>
>>> >>> I think that what you ask is achievable with the dataset API without
>>> >>> much effort. You'd have to insert the pre-buffering at
>>> >>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method
>>> >>> is
>>> >>> essentially a generator that looks like
>>> >>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
>>> >>> fragment in-order. The application consuming the ScanTask could
>>> >>> control the number of scheduled tasks by looking at the IO pool
>>> >>> load.
>>> >>>
>>> >>> OTOH, It would be good if we could make this format agnostic, e.g.
>>> >>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
>>> >>> would be applicable to all formats, CSV, ipc, ...
>>> >>>
>>> >>> François
>>> >>> [1]
>>> >>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
>>> >>>
>>> >>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com>
>>> >>> wrote:
>>> >>>>
>>> >>>> Sure, and we are still interested in collaborating. The main use
>>> >>>> case
>>> >>>> we have is scanning datasets in order of the partition key; it
>>> >>>> seems
>>> >>>> ordering is the only missing thing from Antoine's comments.
>>> >>>> However,
>>> >>>> from briefly playing around with the Python API, an application
>>> >>>> could
>>> >>>> manually order the fragments if so desired, so that still works for
>>> >>>> us, even if ordering isn't otherwise a guarantee.
>>> >>>>
>>> >>>> Performance-wise, we would want intra-file concurrency (coalescing)
>>> >>>> and inter-file concurrency (buffering files in order, as described
>>> >>>> in
>>> >>>> my previous messages). Even if Datasets doesn't directly handle
>>> >>>> this,
>>> >>>> it'd be ideal if an application could achieve this if it were
>>> >>>> willing
>>> >>>> to manage the details. I also vaguely remember seeing some interest
>>> >>>> in
>>> >>>> things like being able to distribute a computation over a dataset
>>> >>>> via
>>> >>>> Dask or some other distributed computation system, which would also
>>> >>>> be
>>> >>>> interesting to us, though not a concrete requirement.
>>> >>>>
>>> >>>> I'd like to reference the original proposal document, which has
>>> >>>> more
>>> >>>> detail on our workloads and use cases:
>>> >>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
>>> >>>> As described there, we have a library that implements both a
>>> >>>> datasets-like API (hand it a remote directory, get back an Arrow
>>> >>>> Table) and several optimizations to make that library perform
>>> >>>> acceptably. Our motivation here is to be able to have a path to
>>> >>>> migrate to using and contributing to Arrow Datasets, which we see
>>> >>>> as
>>> >>>> a
>>> >>>> cross-language, cross-filesystem library, without regressing in
>>> >>>> performance. (We are limited to Python and S3.)
>>> >>>>
>>> >>>> Best,
>>> >>>> David
>>> >>>>
>>> >>>> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
>>> >>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com>
>>> >>>>> wrote:
>>> >>>>>>
>>> >>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
>>> >>>>>> guaranteed to download all the files in order, but with more
>>> >>>>>> control,
>>> >>>>>> you can make this more likely. You can also prevent the case
>>> >>>>>> where
>>> >>>>>> due
>>> >>>>>> to scheduling, file N+1 doesn't even start downloading until
>>> >>>>>> after
>>> >>>>>> file N+2, which can happen if you just submit all reads to a
>>> >>>>>> thread
>>> >>>>>> pool, as demonstrated in the linked trace.
>>> >>>>>>
>>> >>>>>> And again, with this level of control, you can also decide to
>>> >>>>>> reduce
>>> >>>>>> or increase parallelism based on network conditions, memory
>>> >>>>>> usage,
>>> >>>>>> other readers, etc. So it is both about improving/smoothing out
>>> >>>>>> performance, and limiting resource consumption.
>>> >>>>>>
>>> >>>>>> Finally, I do not mean to propose that we necessarily build all
>>> >>>>>> of
>>> >>>>>> this into Arrow, just that it we would like to make it possible
>>> >>>>>> to
>>> >>>>>> build this with Arrow, and that Datasets may find this
>>> >>>>>> interesting
>>> >>>>>> for
>>> >>>>>> its optimization purposes, if concurrent reads are a goal.
>>> >>>>>>
>>> >>>>>>>  Except that datasets are essentially unordered.
>>> >>>>>>
>>> >>>>>> I did not realize this, but that means it's not really suitable
>>> >>>>>> for
>>> >>>>>> our use case, unfortunately.
>>> >>>>>
>>> >>>>> It would be helpful to understand things a bit better so that we
>>> >>>>> do
>>> >>>>> not miss out on an opportunity to collaborate. I don't know that
>>> >>>>> the
>>> >>>>> current mode of the some of the public Datasets APIs is a dogmatic
>>> >>>>> view about how everything should always work, and it's possible
>>> >>>>> that
>>> >>>>> some relatively minor changes could allow you to use it. So let's
>>> >>>>> try
>>> >>>>> not to be closing any doors right now
>>> >>>>>
>>> >>>>>> Thanks,
>>> >>>>>> David
>>> >>>>>>
>>> >>>>>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>>> >>>>>>>
>>> >>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
>>> >>>>>>>> Sure -
>>> >>>>>>>>
>>> >>>>>>>> The use case is to read a large partitioned dataset, consisting
>>> >>>>>>>> of
>>> >>>>>>>> tens or hundreds of Parquet files. A reader expects to scan
>>> >>>>>>>> through
>>> >>>>>>>> the data in order of the partition key. However, to improve
>>> >>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N +
>>> >>>>>>>> k
>>> >>>>>>>> while the consumer is still reading file N, so that it doesn't
>>> >>>>>>>> have
>>> >>>>>>>> to
>>> >>>>>>>> wait every time it opens a new file, and to help hide any
>>> >>>>>>>> latency
>>> >>>>>>>> or
>>> >>>>>>>> slowness that might be happening on the backend. We also don't
>>> >>>>>>>> want
>>> >>>>>>>> to
>>> >>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't,
>>> >>>>>>>> because
>>> >>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
>>> >>>>>>>
>>> >>>>>>> But depending on network conditions, you may very well get file
>>> >>>>>>> N+2
>>> >>>>>>> before N+1, even if you start loading it after...
>>> >>>>>>>
>>> >>>>>>>> This is why I mention the project is quite similar to the
>>> >>>>>>>> Datasets
>>> >>>>>>>> project - Datasets likely covers all the functionality we would
>>> >>>>>>>> eventually need.
>>> >>>>>>>
>>> >>>>>>> Except that datasets are essentially unordered.
>>> >>>>>>>
>>> >>>>>>> Regards
>>> >>>>>>>
>>> >>>>>>> Antoine.
>>> >>>>>>>
>>> >>>>>
>>> >
>>
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Wes,

From the outline there it seems like a path forward. I think the mode
that would be helpful here is some sort of simple ordering/dependency
so that the scheduler knows not to schedule subtasks of B until all
subtasks of A have started (but not necessarily finished).

I think the other part would be sizing the pool/number of concurrent
tasks not just by number of tasks, but by some other metric (e.g. if a
user attached a memory usage or bandwidth usage number to each task,
and asked the scheduler to try to stay below some threshold).

Ideally if both were somewhat application-pluggable (e.g. Datasets
attaches an ordering to tasks and the coalescer attaches a memory
usage metric, but it's up to the application to provide a scheduler
that actually accounts for those), that would let applications do
whatever wonky logic they need and limit the amount of bespoke things
maintained in Arrow, at least until it's clear what sorts of
strategies are commonly needed.

Thanks,
David

On 5/1/20, Wes McKinney <we...@gmail.com> wrote:
> I just wrote up a ticket about a general purpose multi-consumer
> scheduler API, do you think this could be the beginning of a
> resolution?
>
> https://issues.apache.org/jira/browse/ARROW-8667
>
> We may also want to design in some affordances so that no consumer is
> ever 100% blocked, even if that causes temporary thread
> oversubscription (e.g. if we have a 16-thread underlying thread pool,
> and they're all being used by one consumer, then a new consumer shows
> up, we spawn a 17-th thread to accommodate the new consumer
> temporarily until some of the other tasks finish, at which point we
> drop down to 16 threads again, but share them fairly)
>
> I haven't read the whole e-mail discussion but I will read more of it
> and make some other comments.
>
> On Thu, Apr 30, 2020 at 3:17 PM David Li <li...@gmail.com> wrote:
>>
>> Francois,
>>
>> Thanks for the pointers. I'll see if I can put together a
>> proof-of-concept, might that help discussion? I agree it would be good
>> to make it format-agnostic. I'm also curious what thoughts you'd have
>> on how to manage cross-file parallelism (coalescing only helps within
>> a file). If we just naively start scanning fragments in parallel, we'd
>> still want some way to help ensure the actual reads get issued roughly
>> in order of file (to avoid the problem discussed above, where reads
>> for file B prevent reads for file A from getting scheduled, where B
>> follows A from the consumer's standpoint).
>>
>> Antoine,
>>
>> We would be interested in that as well. One thing we do want to
>> investigate is a better ReadAsync() implementation for S3File as
>> preliminary benchmarking on our side has shown it's quite inefficient
>> (the default implementation makes lots of memcpy()s).
>>
>> Thanks,
>> David
>>
>> On 4/30/20, Antoine Pitrou <an...@python.org> wrote:
>> >
>> > If we want to discuss IO APIs we should do that comprehensively.
>> > There are various ways of expressing what we want to do (explicit
>> > readahead, fadvise-like APIs, async APIs, etc.).
>> >
>> > Regards
>> >
>> > Antoine.
>> >
>> >
>> > Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
>> >> One more point,
>> >>
>> >> It would seem beneficial if we could express this in
>> >> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
>> >> buffering/coalescing would be needed. In the case of Parquet, we'd get
>> >> the _exact_ ranges computed from the medata.This method would also
>> >> possibly benefit other filesystems since on linux it can call
>> >> `readahead` and/or `madvise`.
>> >>
>> >> François
>> >>
>> >>
>> >> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
>> >> <fs...@gmail.com> wrote:
>> >>>
>> >>> Hello David,
>> >>>
>> >>> I think that what you ask is achievable with the dataset API without
>> >>> much effort. You'd have to insert the pre-buffering at
>> >>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method
>> >>> is
>> >>> essentially a generator that looks like
>> >>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
>> >>> fragment in-order. The application consuming the ScanTask could
>> >>> control the number of scheduled tasks by looking at the IO pool load.
>> >>>
>> >>> OTOH, It would be good if we could make this format agnostic, e.g.
>> >>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
>> >>> would be applicable to all formats, CSV, ipc, ...
>> >>>
>> >>> François
>> >>> [1]
>> >>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
>> >>>
>> >>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com>
>> >>> wrote:
>> >>>>
>> >>>> Sure, and we are still interested in collaborating. The main use
>> >>>> case
>> >>>> we have is scanning datasets in order of the partition key; it seems
>> >>>> ordering is the only missing thing from Antoine's comments. However,
>> >>>> from briefly playing around with the Python API, an application
>> >>>> could
>> >>>> manually order the fragments if so desired, so that still works for
>> >>>> us, even if ordering isn't otherwise a guarantee.
>> >>>>
>> >>>> Performance-wise, we would want intra-file concurrency (coalescing)
>> >>>> and inter-file concurrency (buffering files in order, as described
>> >>>> in
>> >>>> my previous messages). Even if Datasets doesn't directly handle
>> >>>> this,
>> >>>> it'd be ideal if an application could achieve this if it were
>> >>>> willing
>> >>>> to manage the details. I also vaguely remember seeing some interest
>> >>>> in
>> >>>> things like being able to distribute a computation over a dataset
>> >>>> via
>> >>>> Dask or some other distributed computation system, which would also
>> >>>> be
>> >>>> interesting to us, though not a concrete requirement.
>> >>>>
>> >>>> I'd like to reference the original proposal document, which has more
>> >>>> detail on our workloads and use cases:
>> >>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
>> >>>> As described there, we have a library that implements both a
>> >>>> datasets-like API (hand it a remote directory, get back an Arrow
>> >>>> Table) and several optimizations to make that library perform
>> >>>> acceptably. Our motivation here is to be able to have a path to
>> >>>> migrate to using and contributing to Arrow Datasets, which we see as
>> >>>> a
>> >>>> cross-language, cross-filesystem library, without regressing in
>> >>>> performance. (We are limited to Python and S3.)
>> >>>>
>> >>>> Best,
>> >>>> David
>> >>>>
>> >>>> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
>> >>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
>> >>>>>> guaranteed to download all the files in order, but with more
>> >>>>>> control,
>> >>>>>> you can make this more likely. You can also prevent the case where
>> >>>>>> due
>> >>>>>> to scheduling, file N+1 doesn't even start downloading until after
>> >>>>>> file N+2, which can happen if you just submit all reads to a
>> >>>>>> thread
>> >>>>>> pool, as demonstrated in the linked trace.
>> >>>>>>
>> >>>>>> And again, with this level of control, you can also decide to
>> >>>>>> reduce
>> >>>>>> or increase parallelism based on network conditions, memory usage,
>> >>>>>> other readers, etc. So it is both about improving/smoothing out
>> >>>>>> performance, and limiting resource consumption.
>> >>>>>>
>> >>>>>> Finally, I do not mean to propose that we necessarily build all of
>> >>>>>> this into Arrow, just that it we would like to make it possible to
>> >>>>>> build this with Arrow, and that Datasets may find this interesting
>> >>>>>> for
>> >>>>>> its optimization purposes, if concurrent reads are a goal.
>> >>>>>>
>> >>>>>>>  Except that datasets are essentially unordered.
>> >>>>>>
>> >>>>>> I did not realize this, but that means it's not really suitable
>> >>>>>> for
>> >>>>>> our use case, unfortunately.
>> >>>>>
>> >>>>> It would be helpful to understand things a bit better so that we do
>> >>>>> not miss out on an opportunity to collaborate. I don't know that
>> >>>>> the
>> >>>>> current mode of the some of the public Datasets APIs is a dogmatic
>> >>>>> view about how everything should always work, and it's possible
>> >>>>> that
>> >>>>> some relatively minor changes could allow you to use it. So let's
>> >>>>> try
>> >>>>> not to be closing any doors right now
>> >>>>>
>> >>>>>> Thanks,
>> >>>>>> David
>> >>>>>>
>> >>>>>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>> >>>>>>>
>> >>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
>> >>>>>>>> Sure -
>> >>>>>>>>
>> >>>>>>>> The use case is to read a large partitioned dataset, consisting
>> >>>>>>>> of
>> >>>>>>>> tens or hundreds of Parquet files. A reader expects to scan
>> >>>>>>>> through
>> >>>>>>>> the data in order of the partition key. However, to improve
>> >>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N +
>> >>>>>>>> k
>> >>>>>>>> while the consumer is still reading file N, so that it doesn't
>> >>>>>>>> have
>> >>>>>>>> to
>> >>>>>>>> wait every time it opens a new file, and to help hide any
>> >>>>>>>> latency
>> >>>>>>>> or
>> >>>>>>>> slowness that might be happening on the backend. We also don't
>> >>>>>>>> want
>> >>>>>>>> to
>> >>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't,
>> >>>>>>>> because
>> >>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
>> >>>>>>>
>> >>>>>>> But depending on network conditions, you may very well get file
>> >>>>>>> N+2
>> >>>>>>> before N+1, even if you start loading it after...
>> >>>>>>>
>> >>>>>>>> This is why I mention the project is quite similar to the
>> >>>>>>>> Datasets
>> >>>>>>>> project - Datasets likely covers all the functionality we would
>> >>>>>>>> eventually need.
>> >>>>>>>
>> >>>>>>> Except that datasets are essentially unordered.
>> >>>>>>>
>> >>>>>>> Regards
>> >>>>>>>
>> >>>>>>> Antoine.
>> >>>>>>>
>> >>>>>
>> >
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
I just wrote up a ticket about a general purpose multi-consumer
scheduler API, do you think this could be the beginning of a
resolution?

https://issues.apache.org/jira/browse/ARROW-8667

We may also want to design in some affordances so that no consumer is
ever 100% blocked, even if that causes temporary thread
oversubscription (e.g. if we have a 16-thread underlying thread pool,
and they're all being used by one consumer, then a new consumer shows
up, we spawn a 17-th thread to accommodate the new consumer
temporarily until some of the other tasks finish, at which point we
drop down to 16 threads again, but share them fairly)

I haven't read the whole e-mail discussion but I will read more of it
and make some other comments.

On Thu, Apr 30, 2020 at 3:17 PM David Li <li...@gmail.com> wrote:
>
> Francois,
>
> Thanks for the pointers. I'll see if I can put together a
> proof-of-concept, might that help discussion? I agree it would be good
> to make it format-agnostic. I'm also curious what thoughts you'd have
> on how to manage cross-file parallelism (coalescing only helps within
> a file). If we just naively start scanning fragments in parallel, we'd
> still want some way to help ensure the actual reads get issued roughly
> in order of file (to avoid the problem discussed above, where reads
> for file B prevent reads for file A from getting scheduled, where B
> follows A from the consumer's standpoint).
>
> Antoine,
>
> We would be interested in that as well. One thing we do want to
> investigate is a better ReadAsync() implementation for S3File as
> preliminary benchmarking on our side has shown it's quite inefficient
> (the default implementation makes lots of memcpy()s).
>
> Thanks,
> David
>
> On 4/30/20, Antoine Pitrou <an...@python.org> wrote:
> >
> > If we want to discuss IO APIs we should do that comprehensively.
> > There are various ways of expressing what we want to do (explicit
> > readahead, fadvise-like APIs, async APIs, etc.).
> >
> > Regards
> >
> > Antoine.
> >
> >
> > Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
> >> One more point,
> >>
> >> It would seem beneficial if we could express this in
> >> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
> >> buffering/coalescing would be needed. In the case of Parquet, we'd get
> >> the _exact_ ranges computed from the medata.This method would also
> >> possibly benefit other filesystems since on linux it can call
> >> `readahead` and/or `madvise`.
> >>
> >> François
> >>
> >>
> >> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
> >> <fs...@gmail.com> wrote:
> >>>
> >>> Hello David,
> >>>
> >>> I think that what you ask is achievable with the dataset API without
> >>> much effort. You'd have to insert the pre-buffering at
> >>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is
> >>> essentially a generator that looks like
> >>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
> >>> fragment in-order. The application consuming the ScanTask could
> >>> control the number of scheduled tasks by looking at the IO pool load.
> >>>
> >>> OTOH, It would be good if we could make this format agnostic, e.g.
> >>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
> >>> would be applicable to all formats, CSV, ipc, ...
> >>>
> >>> François
> >>> [1]
> >>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
> >>>
> >>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com> wrote:
> >>>>
> >>>> Sure, and we are still interested in collaborating. The main use case
> >>>> we have is scanning datasets in order of the partition key; it seems
> >>>> ordering is the only missing thing from Antoine's comments. However,
> >>>> from briefly playing around with the Python API, an application could
> >>>> manually order the fragments if so desired, so that still works for
> >>>> us, even if ordering isn't otherwise a guarantee.
> >>>>
> >>>> Performance-wise, we would want intra-file concurrency (coalescing)
> >>>> and inter-file concurrency (buffering files in order, as described in
> >>>> my previous messages). Even if Datasets doesn't directly handle this,
> >>>> it'd be ideal if an application could achieve this if it were willing
> >>>> to manage the details. I also vaguely remember seeing some interest in
> >>>> things like being able to distribute a computation over a dataset via
> >>>> Dask or some other distributed computation system, which would also be
> >>>> interesting to us, though not a concrete requirement.
> >>>>
> >>>> I'd like to reference the original proposal document, which has more
> >>>> detail on our workloads and use cases:
> >>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
> >>>> As described there, we have a library that implements both a
> >>>> datasets-like API (hand it a remote directory, get back an Arrow
> >>>> Table) and several optimizations to make that library perform
> >>>> acceptably. Our motivation here is to be able to have a path to
> >>>> migrate to using and contributing to Arrow Datasets, which we see as a
> >>>> cross-language, cross-filesystem library, without regressing in
> >>>> performance. (We are limited to Python and S3.)
> >>>>
> >>>> Best,
> >>>> David
> >>>>
> >>>> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
> >>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
> >>>>>> guaranteed to download all the files in order, but with more control,
> >>>>>> you can make this more likely. You can also prevent the case where
> >>>>>> due
> >>>>>> to scheduling, file N+1 doesn't even start downloading until after
> >>>>>> file N+2, which can happen if you just submit all reads to a thread
> >>>>>> pool, as demonstrated in the linked trace.
> >>>>>>
> >>>>>> And again, with this level of control, you can also decide to reduce
> >>>>>> or increase parallelism based on network conditions, memory usage,
> >>>>>> other readers, etc. So it is both about improving/smoothing out
> >>>>>> performance, and limiting resource consumption.
> >>>>>>
> >>>>>> Finally, I do not mean to propose that we necessarily build all of
> >>>>>> this into Arrow, just that it we would like to make it possible to
> >>>>>> build this with Arrow, and that Datasets may find this interesting
> >>>>>> for
> >>>>>> its optimization purposes, if concurrent reads are a goal.
> >>>>>>
> >>>>>>>  Except that datasets are essentially unordered.
> >>>>>>
> >>>>>> I did not realize this, but that means it's not really suitable for
> >>>>>> our use case, unfortunately.
> >>>>>
> >>>>> It would be helpful to understand things a bit better so that we do
> >>>>> not miss out on an opportunity to collaborate. I don't know that the
> >>>>> current mode of the some of the public Datasets APIs is a dogmatic
> >>>>> view about how everything should always work, and it's possible that
> >>>>> some relatively minor changes could allow you to use it. So let's try
> >>>>> not to be closing any doors right now
> >>>>>
> >>>>>> Thanks,
> >>>>>> David
> >>>>>>
> >>>>>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
> >>>>>>>
> >>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
> >>>>>>>> Sure -
> >>>>>>>>
> >>>>>>>> The use case is to read a large partitioned dataset, consisting of
> >>>>>>>> tens or hundreds of Parquet files. A reader expects to scan through
> >>>>>>>> the data in order of the partition key. However, to improve
> >>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N + k
> >>>>>>>> while the consumer is still reading file N, so that it doesn't have
> >>>>>>>> to
> >>>>>>>> wait every time it opens a new file, and to help hide any latency
> >>>>>>>> or
> >>>>>>>> slowness that might be happening on the backend. We also don't want
> >>>>>>>> to
> >>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't,
> >>>>>>>> because
> >>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
> >>>>>>>
> >>>>>>> But depending on network conditions, you may very well get file N+2
> >>>>>>> before N+1, even if you start loading it after...
> >>>>>>>
> >>>>>>>> This is why I mention the project is quite similar to the Datasets
> >>>>>>>> project - Datasets likely covers all the functionality we would
> >>>>>>>> eventually need.
> >>>>>>>
> >>>>>>> Except that datasets are essentially unordered.
> >>>>>>>
> >>>>>>> Regards
> >>>>>>>
> >>>>>>> Antoine.
> >>>>>>>
> >>>>>
> >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Francois,

Thanks for the pointers. I'll see if I can put together a
proof-of-concept, might that help discussion? I agree it would be good
to make it format-agnostic. I'm also curious what thoughts you'd have
on how to manage cross-file parallelism (coalescing only helps within
a file). If we just naively start scanning fragments in parallel, we'd
still want some way to help ensure the actual reads get issued roughly
in order of file (to avoid the problem discussed above, where reads
for file B prevent reads for file A from getting scheduled, where B
follows A from the consumer's standpoint).

Antoine,

We would be interested in that as well. One thing we do want to
investigate is a better ReadAsync() implementation for S3File as
preliminary benchmarking on our side has shown it's quite inefficient
(the default implementation makes lots of memcpy()s).

Thanks,
David

On 4/30/20, Antoine Pitrou <an...@python.org> wrote:
>
> If we want to discuss IO APIs we should do that comprehensively.
> There are various ways of expressing what we want to do (explicit
> readahead, fadvise-like APIs, async APIs, etc.).
>
> Regards
>
> Antoine.
>
>
> Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
>> One more point,
>>
>> It would seem beneficial if we could express this in
>> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
>> buffering/coalescing would be needed. In the case of Parquet, we'd get
>> the _exact_ ranges computed from the medata.This method would also
>> possibly benefit other filesystems since on linux it can call
>> `readahead` and/or `madvise`.
>>
>> François
>>
>>
>> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
>> <fs...@gmail.com> wrote:
>>>
>>> Hello David,
>>>
>>> I think that what you ask is achievable with the dataset API without
>>> much effort. You'd have to insert the pre-buffering at
>>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is
>>> essentially a generator that looks like
>>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
>>> fragment in-order. The application consuming the ScanTask could
>>> control the number of scheduled tasks by looking at the IO pool load.
>>>
>>> OTOH, It would be good if we could make this format agnostic, e.g.
>>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
>>> would be applicable to all formats, CSV, ipc, ...
>>>
>>> François
>>> [1]
>>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
>>>
>>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com> wrote:
>>>>
>>>> Sure, and we are still interested in collaborating. The main use case
>>>> we have is scanning datasets in order of the partition key; it seems
>>>> ordering is the only missing thing from Antoine's comments. However,
>>>> from briefly playing around with the Python API, an application could
>>>> manually order the fragments if so desired, so that still works for
>>>> us, even if ordering isn't otherwise a guarantee.
>>>>
>>>> Performance-wise, we would want intra-file concurrency (coalescing)
>>>> and inter-file concurrency (buffering files in order, as described in
>>>> my previous messages). Even if Datasets doesn't directly handle this,
>>>> it'd be ideal if an application could achieve this if it were willing
>>>> to manage the details. I also vaguely remember seeing some interest in
>>>> things like being able to distribute a computation over a dataset via
>>>> Dask or some other distributed computation system, which would also be
>>>> interesting to us, though not a concrete requirement.
>>>>
>>>> I'd like to reference the original proposal document, which has more
>>>> detail on our workloads and use cases:
>>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
>>>> As described there, we have a library that implements both a
>>>> datasets-like API (hand it a remote directory, get back an Arrow
>>>> Table) and several optimizations to make that library perform
>>>> acceptably. Our motivation here is to be able to have a path to
>>>> migrate to using and contributing to Arrow Datasets, which we see as a
>>>> cross-language, cross-filesystem library, without regressing in
>>>> performance. (We are limited to Python and S3.)
>>>>
>>>> Best,
>>>> David
>>>>
>>>> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
>>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
>>>>>> guaranteed to download all the files in order, but with more control,
>>>>>> you can make this more likely. You can also prevent the case where
>>>>>> due
>>>>>> to scheduling, file N+1 doesn't even start downloading until after
>>>>>> file N+2, which can happen if you just submit all reads to a thread
>>>>>> pool, as demonstrated in the linked trace.
>>>>>>
>>>>>> And again, with this level of control, you can also decide to reduce
>>>>>> or increase parallelism based on network conditions, memory usage,
>>>>>> other readers, etc. So it is both about improving/smoothing out
>>>>>> performance, and limiting resource consumption.
>>>>>>
>>>>>> Finally, I do not mean to propose that we necessarily build all of
>>>>>> this into Arrow, just that it we would like to make it possible to
>>>>>> build this with Arrow, and that Datasets may find this interesting
>>>>>> for
>>>>>> its optimization purposes, if concurrent reads are a goal.
>>>>>>
>>>>>>>  Except that datasets are essentially unordered.
>>>>>>
>>>>>> I did not realize this, but that means it's not really suitable for
>>>>>> our use case, unfortunately.
>>>>>
>>>>> It would be helpful to understand things a bit better so that we do
>>>>> not miss out on an opportunity to collaborate. I don't know that the
>>>>> current mode of the some of the public Datasets APIs is a dogmatic
>>>>> view about how everything should always work, and it's possible that
>>>>> some relatively minor changes could allow you to use it. So let's try
>>>>> not to be closing any doors right now
>>>>>
>>>>>> Thanks,
>>>>>> David
>>>>>>
>>>>>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>>>>>>>
>>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
>>>>>>>> Sure -
>>>>>>>>
>>>>>>>> The use case is to read a large partitioned dataset, consisting of
>>>>>>>> tens or hundreds of Parquet files. A reader expects to scan through
>>>>>>>> the data in order of the partition key. However, to improve
>>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N + k
>>>>>>>> while the consumer is still reading file N, so that it doesn't have
>>>>>>>> to
>>>>>>>> wait every time it opens a new file, and to help hide any latency
>>>>>>>> or
>>>>>>>> slowness that might be happening on the backend. We also don't want
>>>>>>>> to
>>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't,
>>>>>>>> because
>>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
>>>>>>>
>>>>>>> But depending on network conditions, you may very well get file N+2
>>>>>>> before N+1, even if you start loading it after...
>>>>>>>
>>>>>>>> This is why I mention the project is quite similar to the Datasets
>>>>>>>> project - Datasets likely covers all the functionality we would
>>>>>>>> eventually need.
>>>>>>>
>>>>>>> Except that datasets are essentially unordered.
>>>>>>>
>>>>>>> Regards
>>>>>>>
>>>>>>> Antoine.
>>>>>>>
>>>>>
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
If we want to discuss IO APIs we should do that comprehensively.
There are various ways of expressing what we want to do (explicit
readahead, fadvise-like APIs, async APIs, etc.).

Regards

Antoine.


Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
> One more point,
> 
> It would seem beneficial if we could express this in
> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
> buffering/coalescing would be needed. In the case of Parquet, we'd get
> the _exact_ ranges computed from the medata.This method would also
> possibly benefit other filesystems since on linux it can call
> `readahead` and/or `madvise`.
> 
> François
> 
> 
> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
> <fs...@gmail.com> wrote:
>>
>> Hello David,
>>
>> I think that what you ask is achievable with the dataset API without
>> much effort. You'd have to insert the pre-buffering at
>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is
>> essentially a generator that looks like
>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
>> fragment in-order. The application consuming the ScanTask could
>> control the number of scheduled tasks by looking at the IO pool load.
>>
>> OTOH, It would be good if we could make this format agnostic, e.g.
>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
>> would be applicable to all formats, CSV, ipc, ...
>>
>> François
>> [1] https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
>>
>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com> wrote:
>>>
>>> Sure, and we are still interested in collaborating. The main use case
>>> we have is scanning datasets in order of the partition key; it seems
>>> ordering is the only missing thing from Antoine's comments. However,
>>> from briefly playing around with the Python API, an application could
>>> manually order the fragments if so desired, so that still works for
>>> us, even if ordering isn't otherwise a guarantee.
>>>
>>> Performance-wise, we would want intra-file concurrency (coalescing)
>>> and inter-file concurrency (buffering files in order, as described in
>>> my previous messages). Even if Datasets doesn't directly handle this,
>>> it'd be ideal if an application could achieve this if it were willing
>>> to manage the details. I also vaguely remember seeing some interest in
>>> things like being able to distribute a computation over a dataset via
>>> Dask or some other distributed computation system, which would also be
>>> interesting to us, though not a concrete requirement.
>>>
>>> I'd like to reference the original proposal document, which has more
>>> detail on our workloads and use cases:
>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
>>> As described there, we have a library that implements both a
>>> datasets-like API (hand it a remote directory, get back an Arrow
>>> Table) and several optimizations to make that library perform
>>> acceptably. Our motivation here is to be able to have a path to
>>> migrate to using and contributing to Arrow Datasets, which we see as a
>>> cross-language, cross-filesystem library, without regressing in
>>> performance. (We are limited to Python and S3.)
>>>
>>> Best,
>>> David
>>>
>>> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com> wrote:
>>>>>
>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
>>>>> guaranteed to download all the files in order, but with more control,
>>>>> you can make this more likely. You can also prevent the case where due
>>>>> to scheduling, file N+1 doesn't even start downloading until after
>>>>> file N+2, which can happen if you just submit all reads to a thread
>>>>> pool, as demonstrated in the linked trace.
>>>>>
>>>>> And again, with this level of control, you can also decide to reduce
>>>>> or increase parallelism based on network conditions, memory usage,
>>>>> other readers, etc. So it is both about improving/smoothing out
>>>>> performance, and limiting resource consumption.
>>>>>
>>>>> Finally, I do not mean to propose that we necessarily build all of
>>>>> this into Arrow, just that it we would like to make it possible to
>>>>> build this with Arrow, and that Datasets may find this interesting for
>>>>> its optimization purposes, if concurrent reads are a goal.
>>>>>
>>>>>>  Except that datasets are essentially unordered.
>>>>>
>>>>> I did not realize this, but that means it's not really suitable for
>>>>> our use case, unfortunately.
>>>>
>>>> It would be helpful to understand things a bit better so that we do
>>>> not miss out on an opportunity to collaborate. I don't know that the
>>>> current mode of the some of the public Datasets APIs is a dogmatic
>>>> view about how everything should always work, and it's possible that
>>>> some relatively minor changes could allow you to use it. So let's try
>>>> not to be closing any doors right now
>>>>
>>>>> Thanks,
>>>>> David
>>>>>
>>>>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>>>>>>
>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
>>>>>>> Sure -
>>>>>>>
>>>>>>> The use case is to read a large partitioned dataset, consisting of
>>>>>>> tens or hundreds of Parquet files. A reader expects to scan through
>>>>>>> the data in order of the partition key. However, to improve
>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N + k
>>>>>>> while the consumer is still reading file N, so that it doesn't have to
>>>>>>> wait every time it opens a new file, and to help hide any latency or
>>>>>>> slowness that might be happening on the backend. We also don't want to
>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't, because
>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
>>>>>>
>>>>>> But depending on network conditions, you may very well get file N+2
>>>>>> before N+1, even if you start loading it after...
>>>>>>
>>>>>>> This is why I mention the project is quite similar to the Datasets
>>>>>>> project - Datasets likely covers all the functionality we would
>>>>>>> eventually need.
>>>>>>
>>>>>> Except that datasets are essentially unordered.
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>> Antoine.
>>>>>>
>>>>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Francois Saint-Jacques <fs...@gmail.com>.
One more point,

It would seem beneficial if we could express this in
`RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
buffering/coalescing would be needed. In the case of Parquet, we'd get
the _exact_ ranges computed from the medata.This method would also
possibly benefit other filesystems since on linux it can call
`readahead` and/or `madvise`.

François


On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
<fs...@gmail.com> wrote:
>
> Hello David,
>
> I think that what you ask is achievable with the dataset API without
> much effort. You'd have to insert the pre-buffering at
> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is
> essentially a generator that looks like
> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
> fragment in-order. The application consuming the ScanTask could
> control the number of scheduled tasks by looking at the IO pool load.
>
> OTOH, It would be good if we could make this format agnostic, e.g.
> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
> would be applicable to all formats, CSV, ipc, ...
>
> François
> [1] https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
>
> On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com> wrote:
> >
> > Sure, and we are still interested in collaborating. The main use case
> > we have is scanning datasets in order of the partition key; it seems
> > ordering is the only missing thing from Antoine's comments. However,
> > from briefly playing around with the Python API, an application could
> > manually order the fragments if so desired, so that still works for
> > us, even if ordering isn't otherwise a guarantee.
> >
> > Performance-wise, we would want intra-file concurrency (coalescing)
> > and inter-file concurrency (buffering files in order, as described in
> > my previous messages). Even if Datasets doesn't directly handle this,
> > it'd be ideal if an application could achieve this if it were willing
> > to manage the details. I also vaguely remember seeing some interest in
> > things like being able to distribute a computation over a dataset via
> > Dask or some other distributed computation system, which would also be
> > interesting to us, though not a concrete requirement.
> >
> > I'd like to reference the original proposal document, which has more
> > detail on our workloads and use cases:
> > https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
> > As described there, we have a library that implements both a
> > datasets-like API (hand it a remote directory, get back an Arrow
> > Table) and several optimizations to make that library perform
> > acceptably. Our motivation here is to be able to have a path to
> > migrate to using and contributing to Arrow Datasets, which we see as a
> > cross-language, cross-filesystem library, without regressing in
> > performance. (We are limited to Python and S3.)
> >
> > Best,
> > David
> >
> > On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
> > > On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com> wrote:
> > >>
> > >> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
> > >> guaranteed to download all the files in order, but with more control,
> > >> you can make this more likely. You can also prevent the case where due
> > >> to scheduling, file N+1 doesn't even start downloading until after
> > >> file N+2, which can happen if you just submit all reads to a thread
> > >> pool, as demonstrated in the linked trace.
> > >>
> > >> And again, with this level of control, you can also decide to reduce
> > >> or increase parallelism based on network conditions, memory usage,
> > >> other readers, etc. So it is both about improving/smoothing out
> > >> performance, and limiting resource consumption.
> > >>
> > >> Finally, I do not mean to propose that we necessarily build all of
> > >> this into Arrow, just that it we would like to make it possible to
> > >> build this with Arrow, and that Datasets may find this interesting for
> > >> its optimization purposes, if concurrent reads are a goal.
> > >>
> > >> >  Except that datasets are essentially unordered.
> > >>
> > >> I did not realize this, but that means it's not really suitable for
> > >> our use case, unfortunately.
> > >
> > > It would be helpful to understand things a bit better so that we do
> > > not miss out on an opportunity to collaborate. I don't know that the
> > > current mode of the some of the public Datasets APIs is a dogmatic
> > > view about how everything should always work, and it's possible that
> > > some relatively minor changes could allow you to use it. So let's try
> > > not to be closing any doors right now
> > >
> > >> Thanks,
> > >> David
> > >>
> > >> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
> > >> >
> > >> > Le 29/04/2020 à 23:30, David Li a écrit :
> > >> >> Sure -
> > >> >>
> > >> >> The use case is to read a large partitioned dataset, consisting of
> > >> >> tens or hundreds of Parquet files. A reader expects to scan through
> > >> >> the data in order of the partition key. However, to improve
> > >> >> performance, we'd like to begin loading files N+1, N+2, ... N + k
> > >> >> while the consumer is still reading file N, so that it doesn't have to
> > >> >> wait every time it opens a new file, and to help hide any latency or
> > >> >> slowness that might be happening on the backend. We also don't want to
> > >> >> be in a situation where file N+2 is ready but file N+1 isn't, because
> > >> >> that doesn't help us (we still have to wait for N+1 to load).
> > >> >
> > >> > But depending on network conditions, you may very well get file N+2
> > >> > before N+1, even if you start loading it after...
> > >> >
> > >> >> This is why I mention the project is quite similar to the Datasets
> > >> >> project - Datasets likely covers all the functionality we would
> > >> >> eventually need.
> > >> >
> > >> > Except that datasets are essentially unordered.
> > >> >
> > >> > Regards
> > >> >
> > >> > Antoine.
> > >> >
> > >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Francois Saint-Jacques <fs...@gmail.com>.
Hello David,

I think that what you ask is achievable with the dataset API without
much effort. You'd have to insert the pre-buffering at
ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is
essentially a generator that looks like
flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
fragment in-order. The application consuming the ScanTask could
control the number of scheduled tasks by looking at the IO pool load.

OTOH, It would be good if we could make this format agnostic, e.g.
offer this via a ScanOptions toggle, e.g. "readahead_files" and this
would be applicable to all formats, CSV, ipc, ...

François
[1] https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401

On Thu, Apr 30, 2020 at 8:20 AM David Li <li...@gmail.com> wrote:
>
> Sure, and we are still interested in collaborating. The main use case
> we have is scanning datasets in order of the partition key; it seems
> ordering is the only missing thing from Antoine's comments. However,
> from briefly playing around with the Python API, an application could
> manually order the fragments if so desired, so that still works for
> us, even if ordering isn't otherwise a guarantee.
>
> Performance-wise, we would want intra-file concurrency (coalescing)
> and inter-file concurrency (buffering files in order, as described in
> my previous messages). Even if Datasets doesn't directly handle this,
> it'd be ideal if an application could achieve this if it were willing
> to manage the details. I also vaguely remember seeing some interest in
> things like being able to distribute a computation over a dataset via
> Dask or some other distributed computation system, which would also be
> interesting to us, though not a concrete requirement.
>
> I'd like to reference the original proposal document, which has more
> detail on our workloads and use cases:
> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
> As described there, we have a library that implements both a
> datasets-like API (hand it a remote directory, get back an Arrow
> Table) and several optimizations to make that library perform
> acceptably. Our motivation here is to be able to have a path to
> migrate to using and contributing to Arrow Datasets, which we see as a
> cross-language, cross-filesystem library, without regressing in
> performance. (We are limited to Python and S3.)
>
> Best,
> David
>
> On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
> > On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com> wrote:
> >>
> >> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
> >> guaranteed to download all the files in order, but with more control,
> >> you can make this more likely. You can also prevent the case where due
> >> to scheduling, file N+1 doesn't even start downloading until after
> >> file N+2, which can happen if you just submit all reads to a thread
> >> pool, as demonstrated in the linked trace.
> >>
> >> And again, with this level of control, you can also decide to reduce
> >> or increase parallelism based on network conditions, memory usage,
> >> other readers, etc. So it is both about improving/smoothing out
> >> performance, and limiting resource consumption.
> >>
> >> Finally, I do not mean to propose that we necessarily build all of
> >> this into Arrow, just that it we would like to make it possible to
> >> build this with Arrow, and that Datasets may find this interesting for
> >> its optimization purposes, if concurrent reads are a goal.
> >>
> >> >  Except that datasets are essentially unordered.
> >>
> >> I did not realize this, but that means it's not really suitable for
> >> our use case, unfortunately.
> >
> > It would be helpful to understand things a bit better so that we do
> > not miss out on an opportunity to collaborate. I don't know that the
> > current mode of the some of the public Datasets APIs is a dogmatic
> > view about how everything should always work, and it's possible that
> > some relatively minor changes could allow you to use it. So let's try
> > not to be closing any doors right now
> >
> >> Thanks,
> >> David
> >>
> >> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
> >> >
> >> > Le 29/04/2020 à 23:30, David Li a écrit :
> >> >> Sure -
> >> >>
> >> >> The use case is to read a large partitioned dataset, consisting of
> >> >> tens or hundreds of Parquet files. A reader expects to scan through
> >> >> the data in order of the partition key. However, to improve
> >> >> performance, we'd like to begin loading files N+1, N+2, ... N + k
> >> >> while the consumer is still reading file N, so that it doesn't have to
> >> >> wait every time it opens a new file, and to help hide any latency or
> >> >> slowness that might be happening on the backend. We also don't want to
> >> >> be in a situation where file N+2 is ready but file N+1 isn't, because
> >> >> that doesn't help us (we still have to wait for N+1 to load).
> >> >
> >> > But depending on network conditions, you may very well get file N+2
> >> > before N+1, even if you start loading it after...
> >> >
> >> >> This is why I mention the project is quite similar to the Datasets
> >> >> project - Datasets likely covers all the functionality we would
> >> >> eventually need.
> >> >
> >> > Except that datasets are essentially unordered.
> >> >
> >> > Regards
> >> >
> >> > Antoine.
> >> >
> >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Sure, and we are still interested in collaborating. The main use case
we have is scanning datasets in order of the partition key; it seems
ordering is the only missing thing from Antoine's comments. However,
from briefly playing around with the Python API, an application could
manually order the fragments if so desired, so that still works for
us, even if ordering isn't otherwise a guarantee.

Performance-wise, we would want intra-file concurrency (coalescing)
and inter-file concurrency (buffering files in order, as described in
my previous messages). Even if Datasets doesn't directly handle this,
it'd be ideal if an application could achieve this if it were willing
to manage the details. I also vaguely remember seeing some interest in
things like being able to distribute a computation over a dataset via
Dask or some other distributed computation system, which would also be
interesting to us, though not a concrete requirement.

I'd like to reference the original proposal document, which has more
detail on our workloads and use cases:
https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
As described there, we have a library that implements both a
datasets-like API (hand it a remote directory, get back an Arrow
Table) and several optimizations to make that library perform
acceptably. Our motivation here is to be able to have a path to
migrate to using and contributing to Arrow Datasets, which we see as a
cross-language, cross-filesystem library, without regressing in
performance. (We are limited to Python and S3.)

Best,
David

On 4/29/20, Wes McKinney <we...@gmail.com> wrote:
> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com> wrote:
>>
>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
>> guaranteed to download all the files in order, but with more control,
>> you can make this more likely. You can also prevent the case where due
>> to scheduling, file N+1 doesn't even start downloading until after
>> file N+2, which can happen if you just submit all reads to a thread
>> pool, as demonstrated in the linked trace.
>>
>> And again, with this level of control, you can also decide to reduce
>> or increase parallelism based on network conditions, memory usage,
>> other readers, etc. So it is both about improving/smoothing out
>> performance, and limiting resource consumption.
>>
>> Finally, I do not mean to propose that we necessarily build all of
>> this into Arrow, just that it we would like to make it possible to
>> build this with Arrow, and that Datasets may find this interesting for
>> its optimization purposes, if concurrent reads are a goal.
>>
>> >  Except that datasets are essentially unordered.
>>
>> I did not realize this, but that means it's not really suitable for
>> our use case, unfortunately.
>
> It would be helpful to understand things a bit better so that we do
> not miss out on an opportunity to collaborate. I don't know that the
> current mode of the some of the public Datasets APIs is a dogmatic
> view about how everything should always work, and it's possible that
> some relatively minor changes could allow you to use it. So let's try
> not to be closing any doors right now
>
>> Thanks,
>> David
>>
>> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>> >
>> > Le 29/04/2020 à 23:30, David Li a écrit :
>> >> Sure -
>> >>
>> >> The use case is to read a large partitioned dataset, consisting of
>> >> tens or hundreds of Parquet files. A reader expects to scan through
>> >> the data in order of the partition key. However, to improve
>> >> performance, we'd like to begin loading files N+1, N+2, ... N + k
>> >> while the consumer is still reading file N, so that it doesn't have to
>> >> wait every time it opens a new file, and to help hide any latency or
>> >> slowness that might be happening on the backend. We also don't want to
>> >> be in a situation where file N+2 is ready but file N+1 isn't, because
>> >> that doesn't help us (we still have to wait for N+1 to load).
>> >
>> > But depending on network conditions, you may very well get file N+2
>> > before N+1, even if you start loading it after...
>> >
>> >> This is why I mention the project is quite similar to the Datasets
>> >> project - Datasets likely covers all the functionality we would
>> >> eventually need.
>> >
>> > Except that datasets are essentially unordered.
>> >
>> > Regards
>> >
>> > Antoine.
>> >
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Joris Van den Bossche <jo...@gmail.com>.
On Thu, 30 Apr 2020 at 04:06, Wes McKinney <we...@gmail.com> wrote:

> On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com> wrote:
> >
> > Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
> > guaranteed to download all the files in order, but with more control,
> > you can make this more likely. You can also prevent the case where due
> > to scheduling, file N+1 doesn't even start downloading until after
> > file N+2, which can happen if you just submit all reads to a thread
> > pool, as demonstrated in the linked trace.
> >
> > And again, with this level of control, you can also decide to reduce
> > or increase parallelism based on network conditions, memory usage,
> > other readers, etc. So it is both about improving/smoothing out
> > performance, and limiting resource consumption.
> >
> > Finally, I do not mean to propose that we necessarily build all of
> > this into Arrow, just that it we would like to make it possible to
> > build this with Arrow, and that Datasets may find this interesting for
> > its optimization purposes, if concurrent reads are a goal.
> >
> > >  Except that datasets are essentially unordered.
> >
> > I did not realize this, but that means it's not really suitable for
> > our use case, unfortunately.
>
> It would be helpful to understand things a bit better so that we do
> not miss out on an opportunity to collaborate. I don't know that the
> current mode of the some of the public Datasets APIs is a dogmatic
> view about how everything should always work, and it's possible that
> some relatively minor changes could allow you to use it. So let's try
> not to be closing any doors right now
>

Note that a Dataset itself is actually ordered, AFAIK. Meaning: the list of
Fragments it is composed of is an ordered vector. It's only when eg
consuming scan tasks that the result might not be ordered (this is
currently the case in ToTable, but see
https://issues.apache.org/jira/browse/ARROW-8447 for an issue about
potentially changing this).


> > Thanks,
> > David
> >
> > On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
> > >
> > > Le 29/04/2020 à 23:30, David Li a écrit :
> > >> Sure -
> > >>
> > >> The use case is to read a large partitioned dataset, consisting of
> > >> tens or hundreds of Parquet files. A reader expects to scan through
> > >> the data in order of the partition key. However, to improve
> > >> performance, we'd like to begin loading files N+1, N+2, ... N + k
> > >> while the consumer is still reading file N, so that it doesn't have to
> > >> wait every time it opens a new file, and to help hide any latency or
> > >> slowness that might be happening on the backend. We also don't want to
> > >> be in a situation where file N+2 is ready but file N+1 isn't, because
> > >> that doesn't help us (we still have to wait for N+1 to load).
> > >
> > > But depending on network conditions, you may very well get file N+2
> > > before N+1, even if you start loading it after...
> > >
> > >> This is why I mention the project is quite similar to the Datasets
> > >> project - Datasets likely covers all the functionality we would
> > >> eventually need.
> > >
> > > Except that datasets are essentially unordered.
> > >
> > > Regards
> > >
> > > Antoine.
> > >
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
On Wed, Apr 29, 2020 at 6:54 PM David Li <li...@gmail.com> wrote:
>
> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
> guaranteed to download all the files in order, but with more control,
> you can make this more likely. You can also prevent the case where due
> to scheduling, file N+1 doesn't even start downloading until after
> file N+2, which can happen if you just submit all reads to a thread
> pool, as demonstrated in the linked trace.
>
> And again, with this level of control, you can also decide to reduce
> or increase parallelism based on network conditions, memory usage,
> other readers, etc. So it is both about improving/smoothing out
> performance, and limiting resource consumption.
>
> Finally, I do not mean to propose that we necessarily build all of
> this into Arrow, just that it we would like to make it possible to
> build this with Arrow, and that Datasets may find this interesting for
> its optimization purposes, if concurrent reads are a goal.
>
> >  Except that datasets are essentially unordered.
>
> I did not realize this, but that means it's not really suitable for
> our use case, unfortunately.

It would be helpful to understand things a bit better so that we do
not miss out on an opportunity to collaborate. I don't know that the
current mode of the some of the public Datasets APIs is a dogmatic
view about how everything should always work, and it's possible that
some relatively minor changes could allow you to use it. So let's try
not to be closing any doors right now

> Thanks,
> David
>
> On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
> >
> > Le 29/04/2020 à 23:30, David Li a écrit :
> >> Sure -
> >>
> >> The use case is to read a large partitioned dataset, consisting of
> >> tens or hundreds of Parquet files. A reader expects to scan through
> >> the data in order of the partition key. However, to improve
> >> performance, we'd like to begin loading files N+1, N+2, ... N + k
> >> while the consumer is still reading file N, so that it doesn't have to
> >> wait every time it opens a new file, and to help hide any latency or
> >> slowness that might be happening on the backend. We also don't want to
> >> be in a situation where file N+2 is ready but file N+1 isn't, because
> >> that doesn't help us (we still have to wait for N+1 to load).
> >
> > But depending on network conditions, you may very well get file N+2
> > before N+1, even if you start loading it after...
> >
> >> This is why I mention the project is quite similar to the Datasets
> >> project - Datasets likely covers all the functionality we would
> >> eventually need.
> >
> > Except that datasets are essentially unordered.
> >
> > Regards
> >
> > Antoine.
> >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
guaranteed to download all the files in order, but with more control,
you can make this more likely. You can also prevent the case where due
to scheduling, file N+1 doesn't even start downloading until after
file N+2, which can happen if you just submit all reads to a thread
pool, as demonstrated in the linked trace.

And again, with this level of control, you can also decide to reduce
or increase parallelism based on network conditions, memory usage,
other readers, etc. So it is both about improving/smoothing out
performance, and limiting resource consumption.

Finally, I do not mean to propose that we necessarily build all of
this into Arrow, just that it we would like to make it possible to
build this with Arrow, and that Datasets may find this interesting for
its optimization purposes, if concurrent reads are a goal.

>  Except that datasets are essentially unordered.

I did not realize this, but that means it's not really suitable for
our use case, unfortunately.

Thanks,
David

On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>
> Le 29/04/2020 à 23:30, David Li a écrit :
>> Sure -
>>
>> The use case is to read a large partitioned dataset, consisting of
>> tens or hundreds of Parquet files. A reader expects to scan through
>> the data in order of the partition key. However, to improve
>> performance, we'd like to begin loading files N+1, N+2, ... N + k
>> while the consumer is still reading file N, so that it doesn't have to
>> wait every time it opens a new file, and to help hide any latency or
>> slowness that might be happening on the backend. We also don't want to
>> be in a situation where file N+2 is ready but file N+1 isn't, because
>> that doesn't help us (we still have to wait for N+1 to load).
>
> But depending on network conditions, you may very well get file N+2
> before N+1, even if you start loading it after...
>
>> This is why I mention the project is quite similar to the Datasets
>> project - Datasets likely covers all the functionality we would
>> eventually need.
>
> Except that datasets are essentially unordered.
>
> Regards
>
> Antoine.
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
Le 29/04/2020 à 23:30, David Li a écrit :
> Sure -
> 
> The use case is to read a large partitioned dataset, consisting of
> tens or hundreds of Parquet files. A reader expects to scan through
> the data in order of the partition key. However, to improve
> performance, we'd like to begin loading files N+1, N+2, ... N + k
> while the consumer is still reading file N, so that it doesn't have to
> wait every time it opens a new file, and to help hide any latency or
> slowness that might be happening on the backend. We also don't want to
> be in a situation where file N+2 is ready but file N+1 isn't, because
> that doesn't help us (we still have to wait for N+1 to load).

But depending on network conditions, you may very well get file N+2
before N+1, even if you start loading it after...

> This is why I mention the project is quite similar to the Datasets
> project - Datasets likely covers all the functionality we would
> eventually need.

Except that datasets are essentially unordered.

Regards

Antoine.

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Sure -

The use case is to read a large partitioned dataset, consisting of
tens or hundreds of Parquet files. A reader expects to scan through
the data in order of the partition key. However, to improve
performance, we'd like to begin loading files N+1, N+2, ... N + k
while the consumer is still reading file N, so that it doesn't have to
wait every time it opens a new file, and to help hide any latency or
slowness that might be happening on the backend. We also don't want to
be in a situation where file N+2 is ready but file N+1 isn't, because
that doesn't help us (we still have to wait for N+1 to load).

This is why I mention the project is quite similar to the Datasets
project - Datasets likely covers all the functionality we would
eventually need.

Best,
David

On 4/29/20, Antoine Pitrou <an...@python.org> wrote:
>
> Le 29/04/2020 à 20:49, David Li a écrit :
>>
>> However, we noticed this doesn’t actually bring us the expected
>> benefits. Consider files A, B, and C being buffered in parallel; right
>> now, all I/O goes through an internal I/O pool, and so several
>> operations for each of the three files get added to the pool. However,
>> they get serviced in some random order, and so it’s possible for file
>> C to finish all its I/O operations before file B can. Then, a consumer
>> is unnecessarily stuck waiting for those to complete.
>
> It would be good if you explained your use case a bit more precisely.
> Are you expecting the files to be read in a particular order?  If so, why?
>
> Regards
>
> Antoine.
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
Le 29/04/2020 à 20:49, David Li a écrit :
> 
> However, we noticed this doesn’t actually bring us the expected
> benefits. Consider files A, B, and C being buffered in parallel; right
> now, all I/O goes through an internal I/O pool, and so several
> operations for each of the three files get added to the pool. However,
> they get serviced in some random order, and so it’s possible for file
> C to finish all its I/O operations before file B can. Then, a consumer
> is unnecessarily stuck waiting for those to complete.

It would be good if you explained your use case a bit more precisely.
Are you expecting the files to be read in a particular order?  If so, why?

Regards

Antoine.

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Hi all,

I’d like to follow up on this discussion. Thanks to Antoine, we now
have a read coalescing implementation in-tree which shows clear
performance benefits both when reading plain files and Parquet
files[1]. We now have some follow-up work where we think the design
and implementation could be interesting to the Datasets project. Some
context follows.

We’re using coalescing while loading multiple files in parallel, and
have found that we don’t get the expected speedup. We took the
straightforward approach: buffer up to N additional files in the
background using multiple threads, and read from each file in
sequence, adding more files to the tail as the files at the head are
consumed. This way, we begin loading data for future files while
deserializing/processing the current file.

However, we noticed this doesn’t actually bring us the expected
benefits. Consider files A, B, and C being buffered in parallel; right
now, all I/O goes through an internal I/O pool, and so several
operations for each of the three files get added to the pool. However,
they get serviced in some random order, and so it’s possible for file
C to finish all its I/O operations before file B can. Then, a consumer
is unnecessarily stuck waiting for those to complete.

As a demonstration, see the third chart in this visualization:
https://www.lidavidm.me/arrow/coalescing/vis.html For context: this is
a plot of trace spans, where x-axis is time, y-axis is thread ID, and
color-coding indicates which file the span is associated with. As you
can see, there is a large gap because file_1.parquet gets scheduled
after file_2 and file_3.parquet, and so the consumer thread (row 3
from the bottom) is stuck waiting for that the whole time.

Our proposed approach is to give the application enough control that
it can coordinate I/O across separate files. Returning to the example
above, if the application could discover and execute the I/O
operations for a Parquet file (or FeatherV2, etc.) separately, the
application could hold off on executing I/O for file C until all I/O
for file B has at least started. For example, in the current
framework, if we could tell how many I/O calls would happen when
reading from a Parquet file, we could insert a countdown latch in
between the Parquet file and the RandomAccessFile, which enqueues
reads and prevents them from progressing until all reads for the
previous file have made it through.

This has additional benefits: the application can lower concurrency to
limit memory usage. Or conversely, if it notices many small reads are
being issued, it can raise concurrency to keep bandwidth utilization
high while being confident that it isn’t using additional memory. By
controlling the ordering, the data consumer also gets more consistent
(less variation) read performance.

Concretely, for the PR adding coalescing to Parquet[1], I’d want to
rework it so that the Parquet reader can be asked for byte ranges
given a set of row groups and column indices. Then, I’d like to rework
the reader APIs to optionally accept a pre-populated ReadCache, so
that the application can populate the ReadCache as it wants. Any
further control over I/O would be done by the application, by wrapping
RandomAccessFile as appropriate.

We realize Datasets could do a lot of this, especially as it already
works with multiple files, Amazon S3, and has a JNI binding in
progress (Java being the main impetus of our current project).
However, we’d feel more comfortable building on Datasets once the API
is more stable. In the meantime, though, we would be interested in
pursuing this as an improvement to Datasets. I’m still not familiar
enough with the project and its roadmap to know how this would fit in,
however - but does this sound like it would want to be addressed by
the project eventually?

Best,
David

[1]: https://github.com/apache/arrow/pull/6744#issuecomment-607431959
Note some caveats on the numbers there - the rates are only correct
relative to each other since the benchmark doesn’t measure the actual
size of the deserialized data.


On 3/23/20, David Li <li...@gmail.com> wrote:
> Thanks. I've set up an AWS account for my own testing for now. I've
> also submitted a PR to add a basic benchmark which can be run
> self-contained, against a local Minio instance, or against S3:
> https://github.com/apache/arrow/pull/6675
>
> I ran the benchmark from my local machine, and I can test from EC2
> sometime as well. Performance is not ideal, but I'm being limited by
> my home internet connection - coalescing small chunked reads is (as
> expected) as fast as reading the file in one go, and in the PR
> (testing against localhost where we're not limited by bandwidth), it's
> faster than either option.
>
> ----------------------------------------------------------------------------------
> Benchmark                                           Time           CPU
> Iterations
> ----------------------------------------------------------------------------------
> MinioFixture/ReadAll1Mib/real_time          223416933 ns    9098743 ns
>        413   4.47594MB/s    4.47594 items/s
> MinioFixture/ReadAll100Mib/real_time       6068938152 ns  553319299 ns
>         10   16.4773MB/s   0.164773 items/s
> MinioFixture/ReadAll500Mib/real_time       30735046155 ns 2620718364
> ns          2   16.2681MB/s  0.0325361 items/s
> MinioFixture/ReadChunked100Mib/real_time   9625661666 ns  448637141 ns
>         12   10.3889MB/s   0.103889 items/s
> MinioFixture/ReadChunked500Mib/real_time   58736796101 ns 2070237834
> ns          2   8.51255MB/s  0.0170251 items/s
> MinioFixture/ReadCoalesced100Mib/real_time 6982902546 ns   22553824 ns
>         10   14.3207MB/s   0.143207 items/s
> MinioFixture/ReadCoalesced500Mib/real_time 29923239648 ns  112736805
> ns          3   16.7094MB/s  0.0334188 items/s
> MinioFixture/ReadParquet250K/real_time     21934689795 ns 2052758161
> ns          3   9.90955MB/s  0.0455899 items/s
>
> Best,
> David
>
>
> On 3/22/20, Wes McKinney <we...@gmail.com> wrote:
>> On Thu, Mar 19, 2020 at 10:04 AM David Li <li...@gmail.com> wrote:
>>>
>>> > That's why it's important that we set ourselves up to do performance
>>> > testing in a realistic environment in AWS rather than simulating it.
>>>
>>> For my clarification, what are the plans for this (if any)? I couldn't
>>> find any prior discussion, though it sounds like the discussion around
>>> cloud CI capacity would be one step towards this.
>>>
>>> In the short term we could make tests/benchmarks configurable to not
>>> point at a Minio instance so individual developers can at least try
>>> things.
>>
>> It probably makes sense to begin investing in somewhat portable
>> tooling to assist with running S3-related unit tests and benchmarks
>> inside AWS. This could include initial Parquet dataset generation and
>> other things.
>>
>> As far as testing, I'm happy to pay for some AWS costs (within
>> reason). AWS might be able to donate some credits to us also
>>
>>> Best,
>>> David
>>>
>>> On 3/18/20, David Li <li...@gmail.com> wrote:
>>> > For us it applies to S3-like systems, not only S3 itself, at least.
>>> >
>>> > It does make sense to limit it to some filesystems. The behavior would
>>> > be opt-in at the Parquet reader level, so at the Datasets or
>>> > Filesystem layer we can take care of enabling the flag for filesystems
>>> > where it actually helps.
>>> >
>>> > I've filed these issues:
>>> > - ARROW-8151 to benchmark S3File+Parquet
>>> > (https://issues.apache.org/jira/browse/ARROW-8151)
>>> > - ARROW-8152 to split large reads
>>> > (https://issues.apache.org/jira/browse/ARROW-8152)
>>> > - PARQUET-1820 to use a column filter hint with coalescing
>>> > (https://issues.apache.org/jira/browse/PARQUET-1820)
>>> >
>>> > in addition to PARQUET-1698 which is just about pre-buffering the
>>> > entire row group (which we can now do with ARROW-7995).
>>> >
>>> > Best,
>>> > David
>>> >
>>> > On 3/18/20, Antoine Pitrou <an...@python.org> wrote:
>>> >>
>>> >> Le 18/03/2020 à 18:30, David Li a écrit :
>>> >>>> Instead of S3, you can use the Slow streams and Slow filesystem
>>> >>>> implementations.  It may better protect against varying external
>>> >>>> conditions.
>>> >>>
>>> >>> I think we'd want several different benchmarks - we want to ensure
>>> >>> we
>>> >>> don't regress local filesystem performance, and we also want to
>>> >>> measure in an actual S3 environment. It would also be good to
>>> >>> measure
>>> >>> S3-compatible systems like Google's.
>>> >>>
>>> >>>>> - Use the coalescing inside the Parquet reader (even without a
>>> >>>>> column
>>> >>>>> filter hint - this would subsume PARQUET-1698)
>>> >>>>
>>> >>>> I'm assuming this would be done at the RowGroupReader level, right?
>>> >>>
>>> >>> Ideally we'd be able to coalesce across row groups as well, though
>>> >>> maybe it'd be easier to start with within-row-group-only (I need to
>>> >>> familiarize myself with the reader more).
>>> >>>
>>> >>>> I don't understand what the "advantage" would be.  Can you
>>> >>>> elaborate?
>>> >>>
>>> >>> As Wes said, empirically you can get more bandwidth out of S3 with
>>> >>> multiple concurrent HTTP requests. There is a cost to doing so
>>> >>> (establishing a new connection takes time), hence why the coalescing
>>> >>> tries to group small reads (to fully utilize one connection) and
>>> >>> split
>>> >>> large reads (to be able to take advantage of multiple connections).
>>> >>
>>> >> If that's S3-specific (or even AWS-specific) it might better be done
>>> >> inside the S3 filesystem.  For other filesystems I don't think it
>>> >> makes
>>> >> sense to split reads.
>>> >>
>>> >> Regards
>>> >>
>>> >> Antoine.
>>> >>
>>> >
>>
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Thanks. I've set up an AWS account for my own testing for now. I've
also submitted a PR to add a basic benchmark which can be run
self-contained, against a local Minio instance, or against S3:
https://github.com/apache/arrow/pull/6675

I ran the benchmark from my local machine, and I can test from EC2
sometime as well. Performance is not ideal, but I'm being limited by
my home internet connection - coalescing small chunked reads is (as
expected) as fast as reading the file in one go, and in the PR
(testing against localhost where we're not limited by bandwidth), it's
faster than either option.

----------------------------------------------------------------------------------
Benchmark                                           Time           CPU
Iterations
----------------------------------------------------------------------------------
MinioFixture/ReadAll1Mib/real_time          223416933 ns    9098743 ns
       413   4.47594MB/s    4.47594 items/s
MinioFixture/ReadAll100Mib/real_time       6068938152 ns  553319299 ns
        10   16.4773MB/s   0.164773 items/s
MinioFixture/ReadAll500Mib/real_time       30735046155 ns 2620718364
ns          2   16.2681MB/s  0.0325361 items/s
MinioFixture/ReadChunked100Mib/real_time   9625661666 ns  448637141 ns
        12   10.3889MB/s   0.103889 items/s
MinioFixture/ReadChunked500Mib/real_time   58736796101 ns 2070237834
ns          2   8.51255MB/s  0.0170251 items/s
MinioFixture/ReadCoalesced100Mib/real_time 6982902546 ns   22553824 ns
        10   14.3207MB/s   0.143207 items/s
MinioFixture/ReadCoalesced500Mib/real_time 29923239648 ns  112736805
ns          3   16.7094MB/s  0.0334188 items/s
MinioFixture/ReadParquet250K/real_time     21934689795 ns 2052758161
ns          3   9.90955MB/s  0.0455899 items/s

Best,
David


On 3/22/20, Wes McKinney <we...@gmail.com> wrote:
> On Thu, Mar 19, 2020 at 10:04 AM David Li <li...@gmail.com> wrote:
>>
>> > That's why it's important that we set ourselves up to do performance
>> > testing in a realistic environment in AWS rather than simulating it.
>>
>> For my clarification, what are the plans for this (if any)? I couldn't
>> find any prior discussion, though it sounds like the discussion around
>> cloud CI capacity would be one step towards this.
>>
>> In the short term we could make tests/benchmarks configurable to not
>> point at a Minio instance so individual developers can at least try
>> things.
>
> It probably makes sense to begin investing in somewhat portable
> tooling to assist with running S3-related unit tests and benchmarks
> inside AWS. This could include initial Parquet dataset generation and
> other things.
>
> As far as testing, I'm happy to pay for some AWS costs (within
> reason). AWS might be able to donate some credits to us also
>
>> Best,
>> David
>>
>> On 3/18/20, David Li <li...@gmail.com> wrote:
>> > For us it applies to S3-like systems, not only S3 itself, at least.
>> >
>> > It does make sense to limit it to some filesystems. The behavior would
>> > be opt-in at the Parquet reader level, so at the Datasets or
>> > Filesystem layer we can take care of enabling the flag for filesystems
>> > where it actually helps.
>> >
>> > I've filed these issues:
>> > - ARROW-8151 to benchmark S3File+Parquet
>> > (https://issues.apache.org/jira/browse/ARROW-8151)
>> > - ARROW-8152 to split large reads
>> > (https://issues.apache.org/jira/browse/ARROW-8152)
>> > - PARQUET-1820 to use a column filter hint with coalescing
>> > (https://issues.apache.org/jira/browse/PARQUET-1820)
>> >
>> > in addition to PARQUET-1698 which is just about pre-buffering the
>> > entire row group (which we can now do with ARROW-7995).
>> >
>> > Best,
>> > David
>> >
>> > On 3/18/20, Antoine Pitrou <an...@python.org> wrote:
>> >>
>> >> Le 18/03/2020 à 18:30, David Li a écrit :
>> >>>> Instead of S3, you can use the Slow streams and Slow filesystem
>> >>>> implementations.  It may better protect against varying external
>> >>>> conditions.
>> >>>
>> >>> I think we'd want several different benchmarks - we want to ensure we
>> >>> don't regress local filesystem performance, and we also want to
>> >>> measure in an actual S3 environment. It would also be good to measure
>> >>> S3-compatible systems like Google's.
>> >>>
>> >>>>> - Use the coalescing inside the Parquet reader (even without a
>> >>>>> column
>> >>>>> filter hint - this would subsume PARQUET-1698)
>> >>>>
>> >>>> I'm assuming this would be done at the RowGroupReader level, right?
>> >>>
>> >>> Ideally we'd be able to coalesce across row groups as well, though
>> >>> maybe it'd be easier to start with within-row-group-only (I need to
>> >>> familiarize myself with the reader more).
>> >>>
>> >>>> I don't understand what the "advantage" would be.  Can you
>> >>>> elaborate?
>> >>>
>> >>> As Wes said, empirically you can get more bandwidth out of S3 with
>> >>> multiple concurrent HTTP requests. There is a cost to doing so
>> >>> (establishing a new connection takes time), hence why the coalescing
>> >>> tries to group small reads (to fully utilize one connection) and
>> >>> split
>> >>> large reads (to be able to take advantage of multiple connections).
>> >>
>> >> If that's S3-specific (or even AWS-specific) it might better be done
>> >> inside the S3 filesystem.  For other filesystems I don't think it
>> >> makes
>> >> sense to split reads.
>> >>
>> >> Regards
>> >>
>> >> Antoine.
>> >>
>> >
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
On Thu, Mar 19, 2020 at 10:04 AM David Li <li...@gmail.com> wrote:
>
> > That's why it's important that we set ourselves up to do performance testing in a realistic environment in AWS rather than simulating it.
>
> For my clarification, what are the plans for this (if any)? I couldn't
> find any prior discussion, though it sounds like the discussion around
> cloud CI capacity would be one step towards this.
>
> In the short term we could make tests/benchmarks configurable to not
> point at a Minio instance so individual developers can at least try
> things.

It probably makes sense to begin investing in somewhat portable
tooling to assist with running S3-related unit tests and benchmarks
inside AWS. This could include initial Parquet dataset generation and
other things.

As far as testing, I'm happy to pay for some AWS costs (within
reason). AWS might be able to donate some credits to us also

> Best,
> David
>
> On 3/18/20, David Li <li...@gmail.com> wrote:
> > For us it applies to S3-like systems, not only S3 itself, at least.
> >
> > It does make sense to limit it to some filesystems. The behavior would
> > be opt-in at the Parquet reader level, so at the Datasets or
> > Filesystem layer we can take care of enabling the flag for filesystems
> > where it actually helps.
> >
> > I've filed these issues:
> > - ARROW-8151 to benchmark S3File+Parquet
> > (https://issues.apache.org/jira/browse/ARROW-8151)
> > - ARROW-8152 to split large reads
> > (https://issues.apache.org/jira/browse/ARROW-8152)
> > - PARQUET-1820 to use a column filter hint with coalescing
> > (https://issues.apache.org/jira/browse/PARQUET-1820)
> >
> > in addition to PARQUET-1698 which is just about pre-buffering the
> > entire row group (which we can now do with ARROW-7995).
> >
> > Best,
> > David
> >
> > On 3/18/20, Antoine Pitrou <an...@python.org> wrote:
> >>
> >> Le 18/03/2020 à 18:30, David Li a écrit :
> >>>> Instead of S3, you can use the Slow streams and Slow filesystem
> >>>> implementations.  It may better protect against varying external
> >>>> conditions.
> >>>
> >>> I think we'd want several different benchmarks - we want to ensure we
> >>> don't regress local filesystem performance, and we also want to
> >>> measure in an actual S3 environment. It would also be good to measure
> >>> S3-compatible systems like Google's.
> >>>
> >>>>> - Use the coalescing inside the Parquet reader (even without a column
> >>>>> filter hint - this would subsume PARQUET-1698)
> >>>>
> >>>> I'm assuming this would be done at the RowGroupReader level, right?
> >>>
> >>> Ideally we'd be able to coalesce across row groups as well, though
> >>> maybe it'd be easier to start with within-row-group-only (I need to
> >>> familiarize myself with the reader more).
> >>>
> >>>> I don't understand what the "advantage" would be.  Can you elaborate?
> >>>
> >>> As Wes said, empirically you can get more bandwidth out of S3 with
> >>> multiple concurrent HTTP requests. There is a cost to doing so
> >>> (establishing a new connection takes time), hence why the coalescing
> >>> tries to group small reads (to fully utilize one connection) and split
> >>> large reads (to be able to take advantage of multiple connections).
> >>
> >> If that's S3-specific (or even AWS-specific) it might better be done
> >> inside the S3 filesystem.  For other filesystems I don't think it makes
> >> sense to split reads.
> >>
> >> Regards
> >>
> >> Antoine.
> >>
> >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
> That's why it's important that we set ourselves up to do performance testing in a realistic environment in AWS rather than simulating it.

For my clarification, what are the plans for this (if any)? I couldn't
find any prior discussion, though it sounds like the discussion around
cloud CI capacity would be one step towards this.

In the short term we could make tests/benchmarks configurable to not
point at a Minio instance so individual developers can at least try
things.

Best,
David

On 3/18/20, David Li <li...@gmail.com> wrote:
> For us it applies to S3-like systems, not only S3 itself, at least.
>
> It does make sense to limit it to some filesystems. The behavior would
> be opt-in at the Parquet reader level, so at the Datasets or
> Filesystem layer we can take care of enabling the flag for filesystems
> where it actually helps.
>
> I've filed these issues:
> - ARROW-8151 to benchmark S3File+Parquet
> (https://issues.apache.org/jira/browse/ARROW-8151)
> - ARROW-8152 to split large reads
> (https://issues.apache.org/jira/browse/ARROW-8152)
> - PARQUET-1820 to use a column filter hint with coalescing
> (https://issues.apache.org/jira/browse/PARQUET-1820)
>
> in addition to PARQUET-1698 which is just about pre-buffering the
> entire row group (which we can now do with ARROW-7995).
>
> Best,
> David
>
> On 3/18/20, Antoine Pitrou <an...@python.org> wrote:
>>
>> Le 18/03/2020 à 18:30, David Li a écrit :
>>>> Instead of S3, you can use the Slow streams and Slow filesystem
>>>> implementations.  It may better protect against varying external
>>>> conditions.
>>>
>>> I think we'd want several different benchmarks - we want to ensure we
>>> don't regress local filesystem performance, and we also want to
>>> measure in an actual S3 environment. It would also be good to measure
>>> S3-compatible systems like Google's.
>>>
>>>>> - Use the coalescing inside the Parquet reader (even without a column
>>>>> filter hint - this would subsume PARQUET-1698)
>>>>
>>>> I'm assuming this would be done at the RowGroupReader level, right?
>>>
>>> Ideally we'd be able to coalesce across row groups as well, though
>>> maybe it'd be easier to start with within-row-group-only (I need to
>>> familiarize myself with the reader more).
>>>
>>>> I don't understand what the "advantage" would be.  Can you elaborate?
>>>
>>> As Wes said, empirically you can get more bandwidth out of S3 with
>>> multiple concurrent HTTP requests. There is a cost to doing so
>>> (establishing a new connection takes time), hence why the coalescing
>>> tries to group small reads (to fully utilize one connection) and split
>>> large reads (to be able to take advantage of multiple connections).
>>
>> If that's S3-specific (or even AWS-specific) it might better be done
>> inside the S3 filesystem.  For other filesystems I don't think it makes
>> sense to split reads.
>>
>> Regards
>>
>> Antoine.
>>
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
For us it applies to S3-like systems, not only S3 itself, at least.

It does make sense to limit it to some filesystems. The behavior would
be opt-in at the Parquet reader level, so at the Datasets or
Filesystem layer we can take care of enabling the flag for filesystems
where it actually helps.

I've filed these issues:
- ARROW-8151 to benchmark S3File+Parquet
(https://issues.apache.org/jira/browse/ARROW-8151)
- ARROW-8152 to split large reads
(https://issues.apache.org/jira/browse/ARROW-8152)
- PARQUET-1820 to use a column filter hint with coalescing
(https://issues.apache.org/jira/browse/PARQUET-1820)

in addition to PARQUET-1698 which is just about pre-buffering the
entire row group (which we can now do with ARROW-7995).

Best,
David

On 3/18/20, Antoine Pitrou <an...@python.org> wrote:
>
> Le 18/03/2020 à 18:30, David Li a écrit :
>>> Instead of S3, you can use the Slow streams and Slow filesystem
>>> implementations.  It may better protect against varying external
>>> conditions.
>>
>> I think we'd want several different benchmarks - we want to ensure we
>> don't regress local filesystem performance, and we also want to
>> measure in an actual S3 environment. It would also be good to measure
>> S3-compatible systems like Google's.
>>
>>>> - Use the coalescing inside the Parquet reader (even without a column
>>>> filter hint - this would subsume PARQUET-1698)
>>>
>>> I'm assuming this would be done at the RowGroupReader level, right?
>>
>> Ideally we'd be able to coalesce across row groups as well, though
>> maybe it'd be easier to start with within-row-group-only (I need to
>> familiarize myself with the reader more).
>>
>>> I don't understand what the "advantage" would be.  Can you elaborate?
>>
>> As Wes said, empirically you can get more bandwidth out of S3 with
>> multiple concurrent HTTP requests. There is a cost to doing so
>> (establishing a new connection takes time), hence why the coalescing
>> tries to group small reads (to fully utilize one connection) and split
>> large reads (to be able to take advantage of multiple connections).
>
> If that's S3-specific (or even AWS-specific) it might better be done
> inside the S3 filesystem.  For other filesystems I don't think it makes
> sense to split reads.
>
> Regards
>
> Antoine.
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
Le 18/03/2020 à 18:30, David Li a écrit :
>> Instead of S3, you can use the Slow streams and Slow filesystem implementations.  It may better protect against varying external conditions.
> 
> I think we'd want several different benchmarks - we want to ensure we
> don't regress local filesystem performance, and we also want to
> measure in an actual S3 environment. It would also be good to measure
> S3-compatible systems like Google's.
> 
>>> - Use the coalescing inside the Parquet reader (even without a column
>>> filter hint - this would subsume PARQUET-1698)
>>
>> I'm assuming this would be done at the RowGroupReader level, right?
> 
> Ideally we'd be able to coalesce across row groups as well, though
> maybe it'd be easier to start with within-row-group-only (I need to
> familiarize myself with the reader more).
> 
>> I don't understand what the "advantage" would be.  Can you elaborate?
> 
> As Wes said, empirically you can get more bandwidth out of S3 with
> multiple concurrent HTTP requests. There is a cost to doing so
> (establishing a new connection takes time), hence why the coalescing
> tries to group small reads (to fully utilize one connection) and split
> large reads (to be able to take advantage of multiple connections).

If that's S3-specific (or even AWS-specific) it might better be done
inside the S3 filesystem.  For other filesystems I don't think it makes
sense to split reads.

Regards

Antoine.

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
> Instead of S3, you can use the Slow streams and Slow filesystem implementations.  It may better protect against varying external conditions.

I think we'd want several different benchmarks - we want to ensure we
don't regress local filesystem performance, and we also want to
measure in an actual S3 environment. It would also be good to measure
S3-compatible systems like Google's.

>> - Use the coalescing inside the Parquet reader (even without a column
>> filter hint - this would subsume PARQUET-1698)
>
> I'm assuming this would be done at the RowGroupReader level, right?

Ideally we'd be able to coalesce across row groups as well, though
maybe it'd be easier to start with within-row-group-only (I need to
familiarize myself with the reader more).

> I don't understand what the "advantage" would be.  Can you elaborate?

As Wes said, empirically you can get more bandwidth out of S3 with
multiple concurrent HTTP requests. There is a cost to doing so
(establishing a new connection takes time), hence why the coalescing
tries to group small reads (to fully utilize one connection) and split
large reads (to be able to take advantage of multiple connections).

I will file issues and link them to ARROW-7995. Since there was
interest around PARQUET-1698, hopefully breaking up the tasks will
make it easier for everyone involved to collaborate.

Thanks,
David

On 3/18/20, Wes McKinney <we...@gmail.com> wrote:
> On Wed, Mar 18, 2020 at 11:42 AM Antoine Pitrou <an...@python.org> wrote:
>>
>>
>> Le 18/03/2020 à 17:36, David Li a écrit :
>> > Hi all,
>> >
>> > Thanks to Antoine for implementing the core read coalescing logic.
>> >
>> > We've taken a look at what else needs to be done to get this working,
>> > and it sounds like the following changes would be worthwhile,
>> > independent of the rest of the optimizations we discussed:
>> >
>> > - Add benchmarks of the current Parquet reader with the current S3File
>> > (and other file implementations) so we can track
>> > improvements/regressions
>>
>> Instead of S3, you can use the Slow streams and Slow filesystem
>> implementations.  It may better protect against varying external
>> conditions.
>>
>> > - Use the coalescing inside the Parquet reader (even without a column
>> > filter hint - this would subsume PARQUET-1698)
>>
>> I'm assuming this would be done at the RowGroupReader level, right?
>>
>> > - In coalescing, split large read ranges into smaller ones (this would
>> > further improve on PARQUET-1698 by taking advantage of parallel reads)
>>
>> I don't understand what the "advantage" would be.  Can you elaborate?
>
> Empirically it is known to S3 users that parallelizing reads improves
> throughput. I think it has to do with the way that Amazon's
> infrastructure works. That's why it's important that we set ourselves
> up to do performance testing in a realistic environment in AWS rather
> than simulating it.
>
>> Regards
>>
>> Antoine.
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
On Wed, Mar 18, 2020 at 11:42 AM Antoine Pitrou <an...@python.org> wrote:
>
>
> Le 18/03/2020 à 17:36, David Li a écrit :
> > Hi all,
> >
> > Thanks to Antoine for implementing the core read coalescing logic.
> >
> > We've taken a look at what else needs to be done to get this working,
> > and it sounds like the following changes would be worthwhile,
> > independent of the rest of the optimizations we discussed:
> >
> > - Add benchmarks of the current Parquet reader with the current S3File
> > (and other file implementations) so we can track
> > improvements/regressions
>
> Instead of S3, you can use the Slow streams and Slow filesystem
> implementations.  It may better protect against varying external conditions.
>
> > - Use the coalescing inside the Parquet reader (even without a column
> > filter hint - this would subsume PARQUET-1698)
>
> I'm assuming this would be done at the RowGroupReader level, right?
>
> > - In coalescing, split large read ranges into smaller ones (this would
> > further improve on PARQUET-1698 by taking advantage of parallel reads)
>
> I don't understand what the "advantage" would be.  Can you elaborate?

Empirically it is known to S3 users that parallelizing reads improves
throughput. I think it has to do with the way that Amazon's
infrastructure works. That's why it's important that we set ourselves
up to do performance testing in a realistic environment in AWS rather
than simulating it.

> Regards
>
> Antoine.

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
Le 18/03/2020 à 17:36, David Li a écrit :
> Hi all,
> 
> Thanks to Antoine for implementing the core read coalescing logic.
> 
> We've taken a look at what else needs to be done to get this working,
> and it sounds like the following changes would be worthwhile,
> independent of the rest of the optimizations we discussed:
> 
> - Add benchmarks of the current Parquet reader with the current S3File
> (and other file implementations) so we can track
> improvements/regressions

Instead of S3, you can use the Slow streams and Slow filesystem
implementations.  It may better protect against varying external conditions.

> - Use the coalescing inside the Parquet reader (even without a column
> filter hint - this would subsume PARQUET-1698)

I'm assuming this would be done at the RowGroupReader level, right?

> - In coalescing, split large read ranges into smaller ones (this would
> further improve on PARQUET-1698 by taking advantage of parallel reads)

I don't understand what the "advantage" would be.  Can you elaborate?

Regards

Antoine.

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Hi all,

Thanks to Antoine for implementing the core read coalescing logic.

We've taken a look at what else needs to be done to get this working,
and it sounds like the following changes would be worthwhile,
independent of the rest of the optimizations we discussed:

- Add benchmarks of the current Parquet reader with the current S3File
(and other file implementations) so we can track
improvements/regressions
- Use the coalescing inside the Parquet reader (even without a column
filter hint - this would subsume PARQUET-1698)
- In coalescing, split large read ranges into smaller ones (this would
further improve on PARQUET-1698 by taking advantage of parallel reads)
- Accept a column filter hint in the Parquet reader and use that to
compute read ranges

Does this sound reasonable?

Thanks,
David

On 2/6/20, David Li <li...@gmail.com> wrote:
> Catching up on questions here...
>
>> Typically you can solve this by having enough IO concurrency at once :-)
>> I'm not sure having sophisticated global coordination (based on which
>> algorithms) would bring anything.  Would you care to elaborate?
>
> We aren't proposing *sophisticated* global coordination, rather, just
> using a global pool with a global limit, so that a user doesn't
> unintentionally start hundreds of requests in parallel, and so that
> you can adjust the resource consumption/performance tradeoff.
>
> Essentially, what our library does is maintain two pools (for I/O):
> - One pool produces I/O requests, by going through the list of files,
> fetching the Parquet footers, and queuing up I/O requests on the main
> pool. (This uses a pool so we can fetch and parse metadata from
> multiple Parquet files at once.)
> - One pool serves I/O requests, by fetching chunks and placing them in
> buffers inside the file object implementation.
>
> The global concurrency manager additionally limits the second pool by
> not servicing I/O requests for a file until all of the I/O requests
> for previous files have at least started. (By just having lots of
> concurrency, you might end up starving yourself by reading data you
> don't want quite yet.)
>
> Additionally, the global pool could still be a win for non-Parquet
> files - an implementation can at least submit, say, an entire CSV file
> as a "chunk" and have it read in the background.
>
>> Actually, on a more high-level basis, is the goal to prefetch for
>> sequential consumption of row groups?
>
> At least for us, our query pattern is to sequentially consume row
> groups from a large dataset, where we select a subset of columns and a
> subset of the partition key range (usually time range). Prefetching
> speeds this up substantially, or in general, pipelining discovery of
> files, I/O, and deserialization.
>
>> There are no situations where you would want to consume a scattered
>> subset of row groups (e.g. predicate pushdown)?
>
> With coalescing, this "automatically" gets optimized. If you happen to
> need column chunks from separate row groups that are adjacent or close
> on-disk, coalescing will still fetch them in a single IO call.
>
> We found that having large row groups was more beneficial than small
> row groups, since when you combine small row groups with column
> selection, you end up with a lot of small non-adjacent column chunks -
> which coalescing can't help with. The exact tradeoff depends on the
> dataset and workload, of course.
>
>> This seems like too much to try to build into RandomAccessFile. I would
>> suggest a class that wraps a random access file and manages cached
>> segments
>> and their lifetimes through explicit APIs.
>
> A wrapper class seems ideal, especially as the logic is agnostic to
> the storage backend (except for some parameters which can either be
> hand-tuned or estimated on the fly). It also keeps the scope of the
> changes down.
>
>> Where to put the "async multiple range request" API is a separate
>> question,
>> though. Probably makes sense to start writing some working code and sort
>> it
>> out there.
>
> We haven't looked in this direction much. Our designs are based around
> thread pools partly because we wanted to avoid modifying the Parquet
> and Arrow internals, instead choosing to modify the I/O layer to "keep
> Parquet fed" as quickly as possible.
>
> Overall, I recall there's an issue open for async APIs in
> Arrow...perhaps we want to move that to a separate discussion, or on
> the contrary, explore some experimental APIs here to inform the
> overall design.
>
> Thanks,
> David
>
> On 2/6/20, Wes McKinney <we...@gmail.com> wrote:
>> On Thu, Feb 6, 2020 at 1:30 PM Antoine Pitrou <an...@python.org> wrote:
>>>
>>>
>>> Le 06/02/2020 à 20:20, Wes McKinney a écrit :
>>> >> Actually, on a more high-level basis, is the goal to prefetch for
>>> >> sequential consumption of row groups?
>>> >>
>>> >
>>> > Essentially yes. One "easy" optimization is to prefetch the entire
>>> > serialized row group. This is an evolution of that idea where we want
>>> > to
>>> > prefetch only the needed parts of a row group in a minimum number of
>>> > IO
>>> > calls (consider reading the first 10 columns from a file with 1000
>>> > columns
>>> > -- so we want to do one IO call instead of 10 like we do now).
>>>
>>> There are no situations where you would want to consume a scattered
>>> subset of row groups (e.g. predicate pushdown)?
>>
>> There are. If it can be demonstrated that there are performance gains
>> resulting from IO optimizations involving multiple row groups then I
>> see no reason not to implement them.
>>
>>> Regards
>>>
>>> Antoine.
>>
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Catching up on questions here...

> Typically you can solve this by having enough IO concurrency at once :-)
> I'm not sure having sophisticated global coordination (based on which
> algorithms) would bring anything.  Would you care to elaborate?

We aren't proposing *sophisticated* global coordination, rather, just
using a global pool with a global limit, so that a user doesn't
unintentionally start hundreds of requests in parallel, and so that
you can adjust the resource consumption/performance tradeoff.

Essentially, what our library does is maintain two pools (for I/O):
- One pool produces I/O requests, by going through the list of files,
fetching the Parquet footers, and queuing up I/O requests on the main
pool. (This uses a pool so we can fetch and parse metadata from
multiple Parquet files at once.)
- One pool serves I/O requests, by fetching chunks and placing them in
buffers inside the file object implementation.

The global concurrency manager additionally limits the second pool by
not servicing I/O requests for a file until all of the I/O requests
for previous files have at least started. (By just having lots of
concurrency, you might end up starving yourself by reading data you
don't want quite yet.)

Additionally, the global pool could still be a win for non-Parquet
files - an implementation can at least submit, say, an entire CSV file
as a "chunk" and have it read in the background.

> Actually, on a more high-level basis, is the goal to prefetch for
> sequential consumption of row groups?

At least for us, our query pattern is to sequentially consume row
groups from a large dataset, where we select a subset of columns and a
subset of the partition key range (usually time range). Prefetching
speeds this up substantially, or in general, pipelining discovery of
files, I/O, and deserialization.

> There are no situations where you would want to consume a scattered
> subset of row groups (e.g. predicate pushdown)?

With coalescing, this "automatically" gets optimized. If you happen to
need column chunks from separate row groups that are adjacent or close
on-disk, coalescing will still fetch them in a single IO call.

We found that having large row groups was more beneficial than small
row groups, since when you combine small row groups with column
selection, you end up with a lot of small non-adjacent column chunks -
which coalescing can't help with. The exact tradeoff depends on the
dataset and workload, of course.

> This seems like too much to try to build into RandomAccessFile. I would
> suggest a class that wraps a random access file and manages cached segments
> and their lifetimes through explicit APIs.

A wrapper class seems ideal, especially as the logic is agnostic to
the storage backend (except for some parameters which can either be
hand-tuned or estimated on the fly). It also keeps the scope of the
changes down.

> Where to put the "async multiple range request" API is a separate question,
> though. Probably makes sense to start writing some working code and sort it
> out there.

We haven't looked in this direction much. Our designs are based around
thread pools partly because we wanted to avoid modifying the Parquet
and Arrow internals, instead choosing to modify the I/O layer to "keep
Parquet fed" as quickly as possible.

Overall, I recall there's an issue open for async APIs in
Arrow...perhaps we want to move that to a separate discussion, or on
the contrary, explore some experimental APIs here to inform the
overall design.

Thanks,
David

On 2/6/20, Wes McKinney <we...@gmail.com> wrote:
> On Thu, Feb 6, 2020 at 1:30 PM Antoine Pitrou <an...@python.org> wrote:
>>
>>
>> Le 06/02/2020 à 20:20, Wes McKinney a écrit :
>> >> Actually, on a more high-level basis, is the goal to prefetch for
>> >> sequential consumption of row groups?
>> >>
>> >
>> > Essentially yes. One "easy" optimization is to prefetch the entire
>> > serialized row group. This is an evolution of that idea where we want
>> > to
>> > prefetch only the needed parts of a row group in a minimum number of IO
>> > calls (consider reading the first 10 columns from a file with 1000
>> > columns
>> > -- so we want to do one IO call instead of 10 like we do now).
>>
>> There are no situations where you would want to consume a scattered
>> subset of row groups (e.g. predicate pushdown)?
>
> There are. If it can be demonstrated that there are performance gains
> resulting from IO optimizations involving multiple row groups then I
> see no reason not to implement them.
>
>> Regards
>>
>> Antoine.
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
On Thu, Feb 6, 2020 at 1:30 PM Antoine Pitrou <an...@python.org> wrote:
>
>
> Le 06/02/2020 à 20:20, Wes McKinney a écrit :
> >> Actually, on a more high-level basis, is the goal to prefetch for
> >> sequential consumption of row groups?
> >>
> >
> > Essentially yes. One "easy" optimization is to prefetch the entire
> > serialized row group. This is an evolution of that idea where we want to
> > prefetch only the needed parts of a row group in a minimum number of IO
> > calls (consider reading the first 10 columns from a file with 1000 columns
> > -- so we want to do one IO call instead of 10 like we do now).
>
> There are no situations where you would want to consume a scattered
> subset of row groups (e.g. predicate pushdown)?

There are. If it can be demonstrated that there are performance gains
resulting from IO optimizations involving multiple row groups then I
see no reason not to implement them.

> Regards
>
> Antoine.

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
Le 06/02/2020 à 20:20, Wes McKinney a écrit :
>> Actually, on a more high-level basis, is the goal to prefetch for
>> sequential consumption of row groups?
>>
> 
> Essentially yes. One "easy" optimization is to prefetch the entire
> serialized row group. This is an evolution of that idea where we want to
> prefetch only the needed parts of a row group in a minimum number of IO
> calls (consider reading the first 10 columns from a file with 1000 columns
> -- so we want to do one IO call instead of 10 like we do now).

There are no situations where you would want to consume a scattered
subset of row groups (e.g. predicate pushdown)?

Regards

Antoine.

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
On Thu, Feb 6, 2020, 12:42 PM Antoine Pitrou <an...@python.org> wrote:

>
> Le 06/02/2020 à 19:40, Antoine Pitrou a écrit :
> >
> > Le 06/02/2020 à 19:37, Wes McKinney a écrit :
> >> On Thu, Feb 6, 2020, 12:12 PM Antoine Pitrou <an...@python.org>
> wrote:
> >>
> >>> Le 06/02/2020 à 16:26, Wes McKinney a écrit :
> >>>>
> >>>> This seems useful, too. It becomes a question of where do you want to
> >>>> manage the cached memory segments, however you obtain them. I'm
> >>>> arguing that we should not have much custom code in the Parquet
> >>>> library to manage the prefetched segments (and providing the correct
> >>>> buffer slice to each column reader when they need it), and instead
> >>>> encapsulate this logic so it can be reused.
> >>>
> >>> I see, so RandomAccessFile would have some associative caching logic to
> >>> find whether the exact requested range was cached and then return it to
> >>> the caller?  That sounds doable.  How is lifetime handled then?  Are
> >>> cached buffers kept on the RandomAccessFile until they are requested,
> at
> >>> which point their ownership is transferred to the caller?
> >>>
> >>
> >> This seems like too much to try to build into RandomAccessFile. I would
> >> suggest a class that wraps a random access file and manages cached
> segments
> >> and their lifetimes through explicit APIs.
> >
> > So Parquet would expect to receive that class rather than
> > RandomAccessFile?  Or it would grow separate paths for it?
>
> Actually, on a more high-level basis, is the goal to prefetch for
> sequential consumption of row groups?
>

Essentially yes. One "easy" optimization is to prefetch the entire
serialized row group. This is an evolution of that idea where we want to
prefetch only the needed parts of a row group in a minimum number of IO
calls (consider reading the first 10 columns from a file with 1000 columns
-- so we want to do one IO call instead of 10 like we do now).



> Regards
>
> Antoine.
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
Le 06/02/2020 à 19:40, Antoine Pitrou a écrit :
> 
> Le 06/02/2020 à 19:37, Wes McKinney a écrit :
>> On Thu, Feb 6, 2020, 12:12 PM Antoine Pitrou <an...@python.org> wrote:
>>
>>> Le 06/02/2020 à 16:26, Wes McKinney a écrit :
>>>>
>>>> This seems useful, too. It becomes a question of where do you want to
>>>> manage the cached memory segments, however you obtain them. I'm
>>>> arguing that we should not have much custom code in the Parquet
>>>> library to manage the prefetched segments (and providing the correct
>>>> buffer slice to each column reader when they need it), and instead
>>>> encapsulate this logic so it can be reused.
>>>
>>> I see, so RandomAccessFile would have some associative caching logic to
>>> find whether the exact requested range was cached and then return it to
>>> the caller?  That sounds doable.  How is lifetime handled then?  Are
>>> cached buffers kept on the RandomAccessFile until they are requested, at
>>> which point their ownership is transferred to the caller?
>>>
>>
>> This seems like too much to try to build into RandomAccessFile. I would
>> suggest a class that wraps a random access file and manages cached segments
>> and their lifetimes through explicit APIs.
> 
> So Parquet would expect to receive that class rather than
> RandomAccessFile?  Or it would grow separate paths for it?

Actually, on a more high-level basis, is the goal to prefetch for
sequential consumption of row groups?

Regards

Antoine.

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
Le 06/02/2020 à 19:37, Wes McKinney a écrit :
> On Thu, Feb 6, 2020, 12:12 PM Antoine Pitrou <an...@python.org> wrote:
> 
>> Le 06/02/2020 à 16:26, Wes McKinney a écrit :
>>>
>>> This seems useful, too. It becomes a question of where do you want to
>>> manage the cached memory segments, however you obtain them. I'm
>>> arguing that we should not have much custom code in the Parquet
>>> library to manage the prefetched segments (and providing the correct
>>> buffer slice to each column reader when they need it), and instead
>>> encapsulate this logic so it can be reused.
>>
>> I see, so RandomAccessFile would have some associative caching logic to
>> find whether the exact requested range was cached and then return it to
>> the caller?  That sounds doable.  How is lifetime handled then?  Are
>> cached buffers kept on the RandomAccessFile until they are requested, at
>> which point their ownership is transferred to the caller?
>>
> 
> This seems like too much to try to build into RandomAccessFile. I would
> suggest a class that wraps a random access file and manages cached segments
> and their lifetimes through explicit APIs.

So Parquet would expect to receive that class rather than
RandomAccessFile?  Or it would grow separate paths for it?

Regards

Antoine.

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
On Thu, Feb 6, 2020, 12:12 PM Antoine Pitrou <an...@python.org> wrote:

>
> Le 06/02/2020 à 16:26, Wes McKinney a écrit :
> >
> > This seems useful, too. It becomes a question of where do you want to
> > manage the cached memory segments, however you obtain them. I'm
> > arguing that we should not have much custom code in the Parquet
> > library to manage the prefetched segments (and providing the correct
> > buffer slice to each column reader when they need it), and instead
> > encapsulate this logic so it can be reused.
>
> I see, so RandomAccessFile would have some associative caching logic to
> find whether the exact requested range was cached and then return it to
> the caller?  That sounds doable.  How is lifetime handled then?  Are
> cached buffers kept on the RandomAccessFile until they are requested, at
> which point their ownership is transferred to the caller?
>

This seems like too much to try to build into RandomAccessFile. I would
suggest a class that wraps a random access file and manages cached segments
and their lifetimes through explicit APIs.

Where to put the "async multiple range request" API is a separate question,
though. Probably makes sense to start writing some working code and sort it
out there.


> Regards
>
> Antoine.
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
Le 06/02/2020 à 16:26, Wes McKinney a écrit :
> 
> This seems useful, too. It becomes a question of where do you want to
> manage the cached memory segments, however you obtain them. I'm
> arguing that we should not have much custom code in the Parquet
> library to manage the prefetched segments (and providing the correct
> buffer slice to each column reader when they need it), and instead
> encapsulate this logic so it can be reused.

I see, so RandomAccessFile would have some associative caching logic to
find whether the exact requested range was cached and then return it to
the caller?  That sounds doable.  How is lifetime handled then?  Are
cached buffers kept on the RandomAccessFile until they are requested, at
which point their ownership is transferred to the caller?

Regards

Antoine.

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
Le 06/02/2020 à 17:07, Wes McKinney a écrit :
> In case folks are interested in how some other systems deal with IO
> management / scheduling, the comments in
> 
> https://github.com/apache/impala/blob/master/be/src/runtime/io/disk-io-mgr.h
> 
> and related files might be interesting

Thanks.  There's quite a lot of functionality.  It would be useful to
discuss which parts of that functionality is desirable, and which are
not.  For example, I don't think we should spend development time
writing a complex IO scheduler (using which heuristics?) like Impala
has, but that's my opinion :-)

Regards

Antoine.


> On Thu, Feb 6, 2020 at 9:26 AM Wes McKinney <we...@gmail.com> wrote:
>>
>> On Thu, Feb 6, 2020 at 2:46 AM Antoine Pitrou <so...@pitrou.net> wrote:
>>>
>>> On Wed, 5 Feb 2020 15:46:15 -0600
>>> Wes McKinney <we...@gmail.com> wrote:
>>>>
>>>> I'll comment in more detail on some of the other items in due course,
>>>> but I think this should be handled by an implementation of
>>>> RandomAccessFile (that wraps a naked RandomAccessFile) with some
>>>> additional methods, rather than adding this to the abstract
>>>> RandomAccessFile interface, e.g.
>>>>
>>>> class CachingInputFile : public RandomAccessFile {
>>>>  public:
>>>>    CachingInputFile(std::shared_ptr<RandomAccessFile> naked_file);
>>>>    Status CacheRanges(...);
>>>> };
>>>>
>>>> etc.
>>>
>>> IMHO it may be more beneficial to expose it as an asynchronous API on
>>> RandomAccessFile, for example:
>>> class RandomAccessFile {
>>>  public:
>>>   struct Range {
>>>     int64_t offset;
>>>     int64_t length;
>>>   };
>>>
>>>   std::vector<Promise<std::shared_ptr<Buffer>>>
>>>     ReadRangesAsync(std::vector<Range> ranges);
>>> };
>>>
>>>
>>> The reason is that some APIs such as the C++ AWS S3 API have their own
>>> async support, which may be beneficial to use over a generic Arrow
>>> thread-pool implementation.
>>>
>>> Also, by returning a Promise instead of simply caching the results, you
>>> make it easier to handle the lifetime of the results.
>>
>> This seems useful, too. It becomes a question of where do you want to
>> manage the cached memory segments, however you obtain them. I'm
>> arguing that we should not have much custom code in the Parquet
>> library to manage the prefetched segments (and providing the correct
>> buffer slice to each column reader when they need it), and instead
>> encapsulate this logic so it can be reused.
>>
>> The API I proposed was just a mockup, I agree it would make sense for
>> the prefetching to occur asynchronously so that a column reader can
>> proceed as soon as its coalesced chunk has been prefetched, rather
>> than having to wait synchronously for all prefetching to complete.
>>
>>>
>>> (Promise<T> can be something like std::future<Result<T>>, though
>>> std::future<> has annoying limitations and we may want to write our own
>>> instead)
>>>
>>> Regards
>>>
>>> Antoine.
>>>
>>>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
In case folks are interested in how some other systems deal with IO
management / scheduling, the comments in

https://github.com/apache/impala/blob/master/be/src/runtime/io/disk-io-mgr.h

and related files might be interesting

On Thu, Feb 6, 2020 at 9:26 AM Wes McKinney <we...@gmail.com> wrote:
>
> On Thu, Feb 6, 2020 at 2:46 AM Antoine Pitrou <so...@pitrou.net> wrote:
> >
> > On Wed, 5 Feb 2020 15:46:15 -0600
> > Wes McKinney <we...@gmail.com> wrote:
> > >
> > > I'll comment in more detail on some of the other items in due course,
> > > but I think this should be handled by an implementation of
> > > RandomAccessFile (that wraps a naked RandomAccessFile) with some
> > > additional methods, rather than adding this to the abstract
> > > RandomAccessFile interface, e.g.
> > >
> > > class CachingInputFile : public RandomAccessFile {
> > >  public:
> > >    CachingInputFile(std::shared_ptr<RandomAccessFile> naked_file);
> > >    Status CacheRanges(...);
> > > };
> > >
> > > etc.
> >
> > IMHO it may be more beneficial to expose it as an asynchronous API on
> > RandomAccessFile, for example:
> > class RandomAccessFile {
> >  public:
> >   struct Range {
> >     int64_t offset;
> >     int64_t length;
> >   };
> >
> >   std::vector<Promise<std::shared_ptr<Buffer>>>
> >     ReadRangesAsync(std::vector<Range> ranges);
> > };
> >
> >
> > The reason is that some APIs such as the C++ AWS S3 API have their own
> > async support, which may be beneficial to use over a generic Arrow
> > thread-pool implementation.
> >
> > Also, by returning a Promise instead of simply caching the results, you
> > make it easier to handle the lifetime of the results.
>
> This seems useful, too. It becomes a question of where do you want to
> manage the cached memory segments, however you obtain them. I'm
> arguing that we should not have much custom code in the Parquet
> library to manage the prefetched segments (and providing the correct
> buffer slice to each column reader when they need it), and instead
> encapsulate this logic so it can be reused.
>
> The API I proposed was just a mockup, I agree it would make sense for
> the prefetching to occur asynchronously so that a column reader can
> proceed as soon as its coalesced chunk has been prefetched, rather
> than having to wait synchronously for all prefetching to complete.
>
> >
> > (Promise<T> can be something like std::future<Result<T>>, though
> > std::future<> has annoying limitations and we may want to write our own
> > instead)
> >
> > Regards
> >
> > Antoine.
> >
> >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
On Thu, Feb 6, 2020 at 2:46 AM Antoine Pitrou <so...@pitrou.net> wrote:
>
> On Wed, 5 Feb 2020 15:46:15 -0600
> Wes McKinney <we...@gmail.com> wrote:
> >
> > I'll comment in more detail on some of the other items in due course,
> > but I think this should be handled by an implementation of
> > RandomAccessFile (that wraps a naked RandomAccessFile) with some
> > additional methods, rather than adding this to the abstract
> > RandomAccessFile interface, e.g.
> >
> > class CachingInputFile : public RandomAccessFile {
> >  public:
> >    CachingInputFile(std::shared_ptr<RandomAccessFile> naked_file);
> >    Status CacheRanges(...);
> > };
> >
> > etc.
>
> IMHO it may be more beneficial to expose it as an asynchronous API on
> RandomAccessFile, for example:
> class RandomAccessFile {
>  public:
>   struct Range {
>     int64_t offset;
>     int64_t length;
>   };
>
>   std::vector<Promise<std::shared_ptr<Buffer>>>
>     ReadRangesAsync(std::vector<Range> ranges);
> };
>
>
> The reason is that some APIs such as the C++ AWS S3 API have their own
> async support, which may be beneficial to use over a generic Arrow
> thread-pool implementation.
>
> Also, by returning a Promise instead of simply caching the results, you
> make it easier to handle the lifetime of the results.

This seems useful, too. It becomes a question of where do you want to
manage the cached memory segments, however you obtain them. I'm
arguing that we should not have much custom code in the Parquet
library to manage the prefetched segments (and providing the correct
buffer slice to each column reader when they need it), and instead
encapsulate this logic so it can be reused.

The API I proposed was just a mockup, I agree it would make sense for
the prefetching to occur asynchronously so that a column reader can
proceed as soon as its coalesced chunk has been prefetched, rather
than having to wait synchronously for all prefetching to complete.

>
> (Promise<T> can be something like std::future<Result<T>>, though
> std::future<> has annoying limitations and we may want to write our own
> instead)
>
> Regards
>
> Antoine.
>
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <so...@pitrou.net>.
On Wed, 5 Feb 2020 15:46:15 -0600
Wes McKinney <we...@gmail.com> wrote:
> 
> I'll comment in more detail on some of the other items in due course,
> but I think this should be handled by an implementation of
> RandomAccessFile (that wraps a naked RandomAccessFile) with some
> additional methods, rather than adding this to the abstract
> RandomAccessFile interface, e.g.
> 
> class CachingInputFile : public RandomAccessFile {
>  public:
>    CachingInputFile(std::shared_ptr<RandomAccessFile> naked_file);
>    Status CacheRanges(...);
> };
> 
> etc.

IMHO it may be more beneficial to expose it as an asynchronous API on
RandomAccessFile, for example:

class RandomAccessFile {
 public:
  struct Range {
    int64_t offset;
    int64_t length;
  };

  std::vector<Promise<std::shared_ptr<Buffer>>>
    ReadRangesAsync(std::vector<Range> ranges);
};


The reason is that some APIs such as the C++ AWS S3 API have their own
async support, which may be beneficial to use over a generic Arrow
thread-pool implementation.

Also, by returning a Promise instead of simply caching the results, you
make it easier to handle the lifetime of the results.


(Promise<T> can be something like std::future<Result<T>>, though
std::future<> has annoying limitations and we may want to write our own
instead)

Regards

Antoine.



Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
On Wed, Feb 5, 2020 at 3:37 PM David Li <li...@gmail.com> wrote:
>
> Hi Antoine and Wes,
>
> Thanks for the feedback. Yes, we should definitely consider these as
> separate features.
>
> I agree that it makes sense for the file API (or a derived API) to
> expose a generic CacheRanges or PrebufferRanges API. It could then do
> coalescing and prefetching as desired based on the actual underlying
> data store. (Or, perhaps, the actual prefetch/coalesce steps could be
> implemented as wrappers also implementing the file API, with
> parameters to tune for concurrency level/resource consumption/backend
> storage characteristics; underlying file implementations would just
> treat this as a no-op. This is what we prototyped internally in C++
> before writing this up - a wrapper around S3File that naively
> prefetched the entire file.)
>
> As a separate step, prefetching/caching should also make use of a
> global (or otherwise shared) IO thread pool, so that parallel reads of
> different files implicitly coordinate work with each other as well.
> Then, you could queue up reads of several Parquet files, such that a
> slow network call for one file doesn't block progress for other files,
> without issuing reads for all of these files at once.
>
> It's unclear to me what readahead at the record batch level would
> accomplish - Parquet reads each column chunk in a row group as a
> whole, and if the row groups are large, then multiple record batches
> would fall in the same row group, so then we wouldn't gain any
> parallelism, no? (Admittedly, I'm not familiar with the internals
> here.)
>
> The concurrency manager would apply mostly to networked filesystems,
> yes. It's more an observation that some workloads issue fewer reads of
> large ranges, and other workloads issue lots of reads on small ranges,
> so concurrency should be limited based on estimated bandwidth, and not
> purely on number of concurrent tasks.
>
> In this case, it sounds like there are at least the following concrete tasks:
> - Adding a (no-op) CacheRanges method to the RandomAccessFile API.

I'll comment in more detail on some of the other items in due course,
but I think this should be handled by an implementation of
RandomAccessFile (that wraps a naked RandomAccessFile) with some
additional methods, rather than adding this to the abstract
RandomAccessFile interface, e.g.

class CachingInputFile : public RandomAccessFile {
 public:
   CachingInputFile(std::shared_ptr<RandomAccessFile> naked_file);
   Status CacheRanges(...);
};

etc.

> - Providing a column range hint to Parquet, and compute column chunk
> ranges to buffer via CacheRanges,
> - Providing an implementation of CacheRanges (e.g. via a wrapper
> RandomAccessFile) that coalesces ranges based on backend storage
> characteristics,
> - Providing an implementation of CacheRanges that actually prefetches
> ranges in the background,
> - Providing a shared I/O pool to coordinate such I/O across multiple files,
> - Providing some mechanism to limit concurrency in this shared pool,
> - Sending down all these hints from the Datasets implementation.
>
> Thanks,
> David
>
>
> On 2/5/20, Wes McKinney <we...@gmail.com> wrote:
> > I agree with separating the problem into its constituent concerns to
> > make sure that we are developing appropriate abstractions.
> >
> > Speaking specifically about the Parquet codebase, the way that we
> > access a particular ColumnChunk in a row group is fairly simplistic.
> > See the ReaderProperties::GetStream method
> >
> > https://github.com/apache/arrow/blob/master/cpp/src/parquet/properties.cc#L28
> >
> > Rather than naively issuing a Read command (either buffered or
> > unbuffered, both bad for filesystems like S3), I think we need to
> > insert an abstraction at the point where the column reader class
> > requests an InputStream for its serialized column chunk data, which is
> > right here
> >
> > https://github.com/apache/arrow/blob/master/cpp/src/parquet/file_reader.cc#L123
> >
> > It seems like a stateful object that allows certain byte ranges of the
> > file to be cached would do the trick.
> >
> > Next, an API needs to be provided for applications to indicate which
> > column chunks they intend to read so that the contiguous byte ranges
> > can be pre-buffered, preventing any column reader from issuing naked
> > IO calls. Seems like this should happen at the ReaderProperties level.
> >
> > The IO scheduling seems like it should be abstracted away from the
> > Parquet library itself. So there would be code similar to
> >
> > std::pair<int64_t, int64_t> read_ranges = ComputePrebufferedRanges();
> > RETURN_NOT_OK(caching_file->CacheRanges(read_ranges));
> >
> > The IO scheduling can happen inside the implementation of CacheRanges.
> > Then when the column reader is created it will grab a slice of the
> > cached data rather than issuing IO calls.
> >
> > Let me know if this analysis makes sense
> >
> > - Wes
> >
> > On Wed, Feb 5, 2020 at 9:24 AM Antoine Pitrou <an...@python.org> wrote:
> >>
> >>
> >> Hi David,
> >>
> >> I think we should discuss this as individual features.
> >>
> >> > Read Coalescing: from Parquet metadata, we know exactly> which byte
> >> > ranges of a file will be read, and can “cheatin the S3 IO
> >> layer by fetching them in advance
> >>
> >> It seems there are two things here: coalescing individual reads, and
> >> issuing them in advance.  It seems those are separate concerns.
> >>
> >> - coalescing reads: should the IO layer expose a ReadRanges function to
> >> issue several reads at once, which the Parquet layer can then exploit?
> >>
> >> - issuing reads in advance: isn't that solved by readahead *at the
> >> record batch level* (not the IO block level)?
> >>
> >> > Concurrency Manager: rather than limit parallelism by number of
> >> > outstanding tasks, we can instead limit the estimated bandwidth
> >> > consumption, allowing better performance when read sizes are small.
> >>
> >> - Concurrency Manager: is this a per-source optimization, applying
> >> mainly to networked filesystems?
> >>
> >>
> >> I think we want to make sure that each feature brings progress, instead
> >> of trying to lump everything at once in a big PR.
> >>
> >> Regards
> >>
> >> Antoine.
> >>
> >>
> >>
> >> Le 05/02/2020 à 14:32, David Li a écrit :
> >> > Hello all,
> >> >
> >> > We've been following the Arrow Datasets project with great interest,
> >> > especially as we have an in-house library with similar goals built on
> >> > top of PyArrow. Recently, we noticed some discussion around optimizing
> >> > I/O for such use cases (e.g. PARQUET-1698), which is also where we had
> >> > focused our efforts.
> >> >
> >> > Our long-term goal has been to open-source our library. However, our
> >> > code is in Python, but it would be most useful to everyone in the C++
> >> > core, so that R, Python, Ruby, etc. could benefit. Thus, we'd like to
> >> > share our high-level design, and offer to work with the community on
> >> > the implementation - at the very least, to avoid duplicating work.
> >> > We've summarized our approach, and hope this can start a discussion on
> >> > how to integrate such optimizations into Datasets:
> >> > https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit#
> >> >
> >> > At a high level, we have three main optimizations:
> >> > - Given a set of columns to read, and potentially a filter on a
> >> > partition key, we can use Parquet metadata to compute exact byte
> >> > ranges to read from remote storage, and coalesce/split up reads as
> >> > necessary based on the characteristics of the storage platform.
> >> > - Given byte ranges to read, we can read them in parallel, using a
> >> > global thread pool and concurrency manager to limit parallelism and
> >> > resource consumption.
> >> > - By working at the level of a dataset, we can parallelize these
> >> > operations across files, and pipeline steps like reading Parquet
> >> > metadata with reading and deserialization.
> >> >
> >> > We focus on Parquet and S3/object storage here, but these concepts
> >> > apply to other file formats and storage systems.
> >> >
> >> > The main questions here are whether we think the optimizations are
> >> > useful for Arrow Datasets, and if so, how the API design and
> >> > implementation would proceed - I'd appreciate any feedback on the
> >> > approach here and potential API.
> >> >
> >> > David
> >> >
> >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <so...@pitrou.net>.
On Wed, 5 Feb 2020 16:37:17 -0500
David Li <li...@gmail.com> wrote:
> 
> As a separate step, prefetching/caching should also make use of a
> global (or otherwise shared) IO thread pool, so that parallel reads of
> different files implicitly coordinate work with each other as well.
> Then, you could queue up reads of several Parquet files, such that a
> slow network call for one file doesn't block progress for other files,
> without issuing reads for all of these files at once.

Typically you can solve this by having enough IO concurrency at once :-)
I'm not sure having sophisticated global coordination (based on which
algorithms) would bring anything.  Would you care to elaborate?

> It's unclear to me what readahead at the record batch level would
> accomplish - Parquet reads each column chunk in a row group as a
> whole, and if the row groups are large, then multiple record batches
> would fall in the same row group, so then we wouldn't gain any
> parallelism, no? (Admittedly, I'm not familiar with the internals
> here.)

Well, if each row group is read as a whole, then readahead can be
applied at the row group level (e.g. read K row groups in advance).

Regards

Antoine.



Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by David Li <li...@gmail.com>.
Hi Antoine and Wes,

Thanks for the feedback. Yes, we should definitely consider these as
separate features.

I agree that it makes sense for the file API (or a derived API) to
expose a generic CacheRanges or PrebufferRanges API. It could then do
coalescing and prefetching as desired based on the actual underlying
data store. (Or, perhaps, the actual prefetch/coalesce steps could be
implemented as wrappers also implementing the file API, with
parameters to tune for concurrency level/resource consumption/backend
storage characteristics; underlying file implementations would just
treat this as a no-op. This is what we prototyped internally in C++
before writing this up - a wrapper around S3File that naively
prefetched the entire file.)

As a separate step, prefetching/caching should also make use of a
global (or otherwise shared) IO thread pool, so that parallel reads of
different files implicitly coordinate work with each other as well.
Then, you could queue up reads of several Parquet files, such that a
slow network call for one file doesn't block progress for other files,
without issuing reads for all of these files at once.

It's unclear to me what readahead at the record batch level would
accomplish - Parquet reads each column chunk in a row group as a
whole, and if the row groups are large, then multiple record batches
would fall in the same row group, so then we wouldn't gain any
parallelism, no? (Admittedly, I'm not familiar with the internals
here.)

The concurrency manager would apply mostly to networked filesystems,
yes. It's more an observation that some workloads issue fewer reads of
large ranges, and other workloads issue lots of reads on small ranges,
so concurrency should be limited based on estimated bandwidth, and not
purely on number of concurrent tasks.

In this case, it sounds like there are at least the following concrete tasks:
- Adding a (no-op) CacheRanges method to the RandomAccessFile API.
- Providing a column range hint to Parquet, and compute column chunk
ranges to buffer via CacheRanges,
- Providing an implementation of CacheRanges (e.g. via a wrapper
RandomAccessFile) that coalesces ranges based on backend storage
characteristics,
- Providing an implementation of CacheRanges that actually prefetches
ranges in the background,
- Providing a shared I/O pool to coordinate such I/O across multiple files,
- Providing some mechanism to limit concurrency in this shared pool,
- Sending down all these hints from the Datasets implementation.

Thanks,
David


On 2/5/20, Wes McKinney <we...@gmail.com> wrote:
> I agree with separating the problem into its constituent concerns to
> make sure that we are developing appropriate abstractions.
>
> Speaking specifically about the Parquet codebase, the way that we
> access a particular ColumnChunk in a row group is fairly simplistic.
> See the ReaderProperties::GetStream method
>
> https://github.com/apache/arrow/blob/master/cpp/src/parquet/properties.cc#L28
>
> Rather than naively issuing a Read command (either buffered or
> unbuffered, both bad for filesystems like S3), I think we need to
> insert an abstraction at the point where the column reader class
> requests an InputStream for its serialized column chunk data, which is
> right here
>
> https://github.com/apache/arrow/blob/master/cpp/src/parquet/file_reader.cc#L123
>
> It seems like a stateful object that allows certain byte ranges of the
> file to be cached would do the trick.
>
> Next, an API needs to be provided for applications to indicate which
> column chunks they intend to read so that the contiguous byte ranges
> can be pre-buffered, preventing any column reader from issuing naked
> IO calls. Seems like this should happen at the ReaderProperties level.
>
> The IO scheduling seems like it should be abstracted away from the
> Parquet library itself. So there would be code similar to
>
> std::pair<int64_t, int64_t> read_ranges = ComputePrebufferedRanges();
> RETURN_NOT_OK(caching_file->CacheRanges(read_ranges));
>
> The IO scheduling can happen inside the implementation of CacheRanges.
> Then when the column reader is created it will grab a slice of the
> cached data rather than issuing IO calls.
>
> Let me know if this analysis makes sense
>
> - Wes
>
> On Wed, Feb 5, 2020 at 9:24 AM Antoine Pitrou <an...@python.org> wrote:
>>
>>
>> Hi David,
>>
>> I think we should discuss this as individual features.
>>
>> > Read Coalescing: from Parquet metadata, we know exactly> which byte
>> > ranges of a file will be read, and can “cheatin the S3 IO
>> layer by fetching them in advance
>>
>> It seems there are two things here: coalescing individual reads, and
>> issuing them in advance.  It seems those are separate concerns.
>>
>> - coalescing reads: should the IO layer expose a ReadRanges function to
>> issue several reads at once, which the Parquet layer can then exploit?
>>
>> - issuing reads in advance: isn't that solved by readahead *at the
>> record batch level* (not the IO block level)?
>>
>> > Concurrency Manager: rather than limit parallelism by number of
>> > outstanding tasks, we can instead limit the estimated bandwidth
>> > consumption, allowing better performance when read sizes are small.
>>
>> - Concurrency Manager: is this a per-source optimization, applying
>> mainly to networked filesystems?
>>
>>
>> I think we want to make sure that each feature brings progress, instead
>> of trying to lump everything at once in a big PR.
>>
>> Regards
>>
>> Antoine.
>>
>>
>>
>> Le 05/02/2020 à 14:32, David Li a écrit :
>> > Hello all,
>> >
>> > We've been following the Arrow Datasets project with great interest,
>> > especially as we have an in-house library with similar goals built on
>> > top of PyArrow. Recently, we noticed some discussion around optimizing
>> > I/O for such use cases (e.g. PARQUET-1698), which is also where we had
>> > focused our efforts.
>> >
>> > Our long-term goal has been to open-source our library. However, our
>> > code is in Python, but it would be most useful to everyone in the C++
>> > core, so that R, Python, Ruby, etc. could benefit. Thus, we'd like to
>> > share our high-level design, and offer to work with the community on
>> > the implementation - at the very least, to avoid duplicating work.
>> > We've summarized our approach, and hope this can start a discussion on
>> > how to integrate such optimizations into Datasets:
>> > https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit#
>> >
>> > At a high level, we have three main optimizations:
>> > - Given a set of columns to read, and potentially a filter on a
>> > partition key, we can use Parquet metadata to compute exact byte
>> > ranges to read from remote storage, and coalesce/split up reads as
>> > necessary based on the characteristics of the storage platform.
>> > - Given byte ranges to read, we can read them in parallel, using a
>> > global thread pool and concurrency manager to limit parallelism and
>> > resource consumption.
>> > - By working at the level of a dataset, we can parallelize these
>> > operations across files, and pipeline steps like reading Parquet
>> > metadata with reading and deserialization.
>> >
>> > We focus on Parquet and S3/object storage here, but these concepts
>> > apply to other file formats and storage systems.
>> >
>> > The main questions here are whether we think the optimizations are
>> > useful for Arrow Datasets, and if so, how the API design and
>> > implementation would proceed - I'd appreciate any feedback on the
>> > approach here and potential API.
>> >
>> > David
>> >
>

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Wes McKinney <we...@gmail.com>.
I agree with separating the problem into its constituent concerns to
make sure that we are developing appropriate abstractions.

Speaking specifically about the Parquet codebase, the way that we
access a particular ColumnChunk in a row group is fairly simplistic.
See the ReaderProperties::GetStream method

https://github.com/apache/arrow/blob/master/cpp/src/parquet/properties.cc#L28

Rather than naively issuing a Read command (either buffered or
unbuffered, both bad for filesystems like S3), I think we need to
insert an abstraction at the point where the column reader class
requests an InputStream for its serialized column chunk data, which is
right here

https://github.com/apache/arrow/blob/master/cpp/src/parquet/file_reader.cc#L123

It seems like a stateful object that allows certain byte ranges of the
file to be cached would do the trick.

Next, an API needs to be provided for applications to indicate which
column chunks they intend to read so that the contiguous byte ranges
can be pre-buffered, preventing any column reader from issuing naked
IO calls. Seems like this should happen at the ReaderProperties level.

The IO scheduling seems like it should be abstracted away from the
Parquet library itself. So there would be code similar to

std::pair<int64_t, int64_t> read_ranges = ComputePrebufferedRanges();
RETURN_NOT_OK(caching_file->CacheRanges(read_ranges));

The IO scheduling can happen inside the implementation of CacheRanges.
Then when the column reader is created it will grab a slice of the
cached data rather than issuing IO calls.

Let me know if this analysis makes sense

- Wes

On Wed, Feb 5, 2020 at 9:24 AM Antoine Pitrou <an...@python.org> wrote:
>
>
> Hi David,
>
> I think we should discuss this as individual features.
>
> > Read Coalescing: from Parquet metadata, we know exactly> which byte ranges of a file will be read, and can “cheatin the S3 IO
> layer by fetching them in advance
>
> It seems there are two things here: coalescing individual reads, and
> issuing them in advance.  It seems those are separate concerns.
>
> - coalescing reads: should the IO layer expose a ReadRanges function to
> issue several reads at once, which the Parquet layer can then exploit?
>
> - issuing reads in advance: isn't that solved by readahead *at the
> record batch level* (not the IO block level)?
>
> > Concurrency Manager: rather than limit parallelism by number of
> > outstanding tasks, we can instead limit the estimated bandwidth
> > consumption, allowing better performance when read sizes are small.
>
> - Concurrency Manager: is this a per-source optimization, applying
> mainly to networked filesystems?
>
>
> I think we want to make sure that each feature brings progress, instead
> of trying to lump everything at once in a big PR.
>
> Regards
>
> Antoine.
>
>
>
> Le 05/02/2020 à 14:32, David Li a écrit :
> > Hello all,
> >
> > We've been following the Arrow Datasets project with great interest,
> > especially as we have an in-house library with similar goals built on
> > top of PyArrow. Recently, we noticed some discussion around optimizing
> > I/O for such use cases (e.g. PARQUET-1698), which is also where we had
> > focused our efforts.
> >
> > Our long-term goal has been to open-source our library. However, our
> > code is in Python, but it would be most useful to everyone in the C++
> > core, so that R, Python, Ruby, etc. could benefit. Thus, we'd like to
> > share our high-level design, and offer to work with the community on
> > the implementation - at the very least, to avoid duplicating work.
> > We've summarized our approach, and hope this can start a discussion on
> > how to integrate such optimizations into Datasets:
> > https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit#
> >
> > At a high level, we have three main optimizations:
> > - Given a set of columns to read, and potentially a filter on a
> > partition key, we can use Parquet metadata to compute exact byte
> > ranges to read from remote storage, and coalesce/split up reads as
> > necessary based on the characteristics of the storage platform.
> > - Given byte ranges to read, we can read them in parallel, using a
> > global thread pool and concurrency manager to limit parallelism and
> > resource consumption.
> > - By working at the level of a dataset, we can parallelize these
> > operations across files, and pipeline steps like reading Parquet
> > metadata with reading and deserialization.
> >
> > We focus on Parquet and S3/object storage here, but these concepts
> > apply to other file formats and storage systems.
> >
> > The main questions here are whether we think the optimizations are
> > useful for Arrow Datasets, and if so, how the API design and
> > implementation would proceed - I'd appreciate any feedback on the
> > approach here and potential API.
> >
> > David
> >

Re: [Discuss] Proposal for optimizing Datasets over S3/object storage

Posted by Antoine Pitrou <an...@python.org>.
Hi David,

I think we should discuss this as individual features.

> Read Coalescing: from Parquet metadata, we know exactly> which byte ranges of a file will be read, and can “cheatin the S3 IO
layer by fetching them in advance

It seems there are two things here: coalescing individual reads, and
issuing them in advance.  It seems those are separate concerns.

- coalescing reads: should the IO layer expose a ReadRanges function to
issue several reads at once, which the Parquet layer can then exploit?

- issuing reads in advance: isn't that solved by readahead *at the
record batch level* (not the IO block level)?

> Concurrency Manager: rather than limit parallelism by number of
> outstanding tasks, we can instead limit the estimated bandwidth
> consumption, allowing better performance when read sizes are small.

- Concurrency Manager: is this a per-source optimization, applying
mainly to networked filesystems?


I think we want to make sure that each feature brings progress, instead
of trying to lump everything at once in a big PR.

Regards

Antoine.



Le 05/02/2020 à 14:32, David Li a écrit :
> Hello all,
> 
> We've been following the Arrow Datasets project with great interest,
> especially as we have an in-house library with similar goals built on
> top of PyArrow. Recently, we noticed some discussion around optimizing
> I/O for such use cases (e.g. PARQUET-1698), which is also where we had
> focused our efforts.
> 
> Our long-term goal has been to open-source our library. However, our
> code is in Python, but it would be most useful to everyone in the C++
> core, so that R, Python, Ruby, etc. could benefit. Thus, we'd like to
> share our high-level design, and offer to work with the community on
> the implementation - at the very least, to avoid duplicating work.
> We've summarized our approach, and hope this can start a discussion on
> how to integrate such optimizations into Datasets:
> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit#
> 
> At a high level, we have three main optimizations:
> - Given a set of columns to read, and potentially a filter on a
> partition key, we can use Parquet metadata to compute exact byte
> ranges to read from remote storage, and coalesce/split up reads as
> necessary based on the characteristics of the storage platform.
> - Given byte ranges to read, we can read them in parallel, using a
> global thread pool and concurrency manager to limit parallelism and
> resource consumption.
> - By working at the level of a dataset, we can parallelize these
> operations across files, and pipeline steps like reading Parquet
> metadata with reading and deserialization.
> 
> We focus on Parquet and S3/object storage here, but these concepts
> apply to other file formats and storage systems.
> 
> The main questions here are whether we think the optimizations are
> useful for Arrow Datasets, and if so, how the API design and
> implementation would proceed - I'd appreciate any feedback on the
> approach here and potential API.
> 
> David
>