You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/12 08:44:47 UTC

[GitHub] [arrow-datafusion] tustvold opened a new issue, #2205: RFC: Spill-To-Disk Object Storage Download

tustvold opened a new issue, #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205

   **Creating as high-level ticket to hopefully get consensus on the approach, before potentially creating lower level tickets**
   
   **Is your feature request related to a problem or challenge? Please describe what you are trying to do.**
   
   Currently `ObjectStore::file_reader` returns an `Arc<dyn ObjectReader>`, this in turn has a method `ObjectReader::sync_chunk_reader` which takes a byte range. 
   
   In the case of parquet, a `ChunkObjectReader` wraps this `ObjectReader` and adapts it to the `parquet::ChunkReader` trait. The result is that the parquet reader calls `ObjectReader::sync_chunk_reader` for the byte range of each column chunk, of which there will be one per-column per-RowGroup, which in turn performs a range request to object storage to fetch the bytes. 
   
   As pointed out by @mateuszkj on https://github.com/datafusion-contrib/datafusion-objectstore-s3/pull/53 this unfortunately results in a large number of small requests to S3 (there are also metadata requests which I will cover in a separate ticket concerning catalogs).
   
   In the case of CSV, JSON, etc... `ObjectReader::sync_reader` is used which is equivalent to calling `sync_chunk_reader` with the length of the file, and will therefore buffer the entire file in memory.
   
   This approach therefore has two aspects that could be improved:
   
   * Potentially large numbers of very small requests to object storage adding latency and cost
   * Potentially large amounts of data buffered in memory
   
   **Describe the solution you'd like**
   
   The simplest solution is to just download the entire file to temporary local storage. This is what [IOx](https://github.com/influxdata/influxdb_iox/blob/main/parquet_file/src/storage.rs#L280) currently does and it works well.
   
   The next obvious improvement would then be to use the MemoryManager and DiskManager functionality added by @yjshen in https://github.com/apache/arrow-datafusion/pull/1526 to buffer in memory initially and only spill to disk under memory pressure. 
   
   I suspect for many use-cases this will perform very well, the key observations being:
   
   * Data stored in non-archival object store tiers is billed on request count, and not the amount of data transferred
   * Data transfer from object storage within the same region is extremely fast (10+ Gbps)
   
   A final extension might be to add functionality to fetch smaller byte ranges based on projection and predicate pushdown, I started experimenting with an API of what this might look like [here](https://github.com/apache/arrow-rs/pull/1509), but I don't have a good handle on how to balance the trade-offs of making too many requests vs requesting data we don't need, and I'm personally inclined to punt on this at least initially...
   
   I'm not very familiar with how spark, etc... solve this problem, this is just based on my intuition, and so perhaps @sunchao or someone with more familiarity with that ecosystem might be able to provide some insight here.
   
   **Describe alternatives you've considered**
   
   One option we are likely to implement for [IOx](https://github.com/influxdata/influxdb_iox) is having a shared, instance-local, read-through, disk-based Object Storage cache. The idea being to use the [ephemeral NVMe disk](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/InstanceStorage.html) that is available on cloud provider VMs as a shared cache for one or more query engines running on that instance. This effectively works around this problem by making all IO done by the query engine to very fast local disk, with a separate process handling interaction with object storage as required. It will also accelerate repeated queries to the same "hot" dataset. I would be very happy to write up some tickets if someone wanted to take this on.
   
   [This blog post](https://jorgecarleitao.medium.com/how-to-efficiently-load-data-to-memory-d65ee359196c ) written by @jorgecarleitao proposes streaming files block-wise (thank you @xudong963 for the link). This is close to what the implementation currently does, however, it comes with the drawbacks listed above. *FWIW I have also not found this approach to perform especially well on local files either, see [here](https://github.com/apache/arrow-rs/issues/1473), but I could have been doing something wrong*.
   
   **Additional context**
   
   FYI @alamb @houqp @matthewmturner
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
tustvold commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1098045187

   I think this makes sense, the situation I'm trying to avoid is:
   
   * If you have the file on local disk we don't want to have to buffer it in memory before we can read it
   * If you have the file in memory, we don't want to copy the byte ranges, and just want to slice a `Bytes` object
   
   Currently the trait returns `Box<dyn Read>` which is likely sub-optimal for both.
   
   If we can find some way to handle this, that sounds good to me 😀


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1097902378

   Thus, if I were doing this I would probably make the following three things
   1. Add a "prefetch_hint" certain offsets to the `ObjectStore` API and make the parquet reader call it
   2. Implement a "buffered" ObjectStore interface that wrapped another ObjectStore that prefetches and buffers data memory buffer
   3. Implement a "cached" ObjectStore interface that also wraps another ObjectStore that simply downloads any request to a local disk cache
   
   With those components I think most usecases could be addressed and if someone needed custom caching logic they would likely get a good head start using the "buffered" or "cached" interfaces


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Cheappie commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
Cheappie commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1096403003

   > * Data transfer from object storage within the same region is extremely fast (10+ Gbps)
   
   From what I know such high transfer speed you can only achieve between EC2 instances in **cluster** placement group. For example If I remember correctly r4.2xlarge achieves ~160 MB/s transfer between ec2 and s3. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] steveloughran commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
steveloughran commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1100069800

   choosing when/how to scan and prefetch in object stores is a real tricky business
   
   abfs and gcs connectors do forward prefetch in block sizes you can config in hadoop site/job settings, cache into memory. The more prefetching you do, the more likely a large process will run out of memory.
   
   s3a doesn't and we've been getting complaints about lack of buffering in the client. it does have different seek policies, look at fs.s3a.experimental.fadvise and fs.s3a.readahead.range
   
   You can set seek policy cluster-wise, or, if you use the openFile() api, when opening specific files.
   
   we have two big bits of work on going there how to help mitigate things, both in feature branches right now
   * HADOOP-18103 vectored IO API. It will be available for all FSDataInputStream; object stores can improve with range coalescing and fetching of different ranges in parallel (s3a will be first for this).
   * HADOOP-18028. High performance S3A input stream with prefetching & caching to local disk. feature branch works, but for broader adoption we again need to deal with memory/buffer use and some other issues.
   Really good to have you involved in reviewing/testing the vectored IO API (yes, we want a native binding too), the prefetching work, and indeed if we can get good traces of how your library reads files.
   
   Note also s3a and abfs connectors connect/report stats through the IOStatistics interface. Even if you build against Hadoop versions which don't have that,
   1.  if you call toString() on the streams you get a good summary of what IO took place in that stream only. log this, at debug
   2. on hadoop 3.3.2, set "fs.iostatistics.logging.level"; to info and you get full fs stats dump when the fs instance is closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1097902278

   I see two major, and somewhat orthogonal usecases:
   
   *Usecase*: Multiple reads of unpredictable column / row group subsets of the same file (e.g. IOx)
   *Optimal*: Read data to local file
   
   *Goal*: Single read of a subset of column/row groups (e.g. Cloud Fuse, other "analytics on S3 parquet files")
   *Optimal*: Read subset of the data that is needed into memory, discard after decode 
   
   I have been hoping our ObjectStore interface would allow for both usecases. 
   
   In terms of the "many small requests to S3" problem, I was imagining that the S3 ObjectStore implementation would implement "pre-fetching" internally (the same way local filesystems do) to coalesce multiple small requests into fewer larger ones.  This strategy is particularly effective if we know what parts of the file are likely to be needed.
   
   Conveniently, the parquet format is quite amenable to this (as once the reader has figured out it wants to scan a row group, it also knows what file data (offsets) it needs). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] sunchao commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
sunchao commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1099675328

   FWIW within each Spark task, it currently process each row group in a sequential manner, and for each of these it'll read all the projected column chunks (with filtered pages after column index), buffer them in memory and then start decompressing + decoding. For interacting with S3/HDFS/etc, it relies on the Hadoop's [FileSystem](https://hadoop.apache.org/docs/r3.1.0/api/org/apache/hadoop/fs/FileSystem.html) API. @steveloughran is the expert here on the S3 client implementation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
tustvold commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1097930579

   I think we all agree on where we would like to end up, however, I worry we are trying to run before we can walk here. I would much prefer an approach that does the simplest thing possible, namely downloads the entire file, and then iteratively add functionality, such as fetching to memory, selective fetching, etc... Currently we have an approach that isn't really very effective at either...
   
   > the same way local filesystems do
   
   I'm not sure this is a fair comparison, object storage has vastly different performance and billing characteristics from a local filesystem?
   
   > Add a "prefetch_hint" certain offsets to the ObjectStore API and make the parquet reader call it
   
   Why would you implement this in the ObjectStore API, and not some component generic over object stores. The caching, spilling, logic, etc... is not going to vary based on object store provider? An ObjectStore API that supports fetch requests with an optional byte range should have us covered?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
tustvold commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1096420888

   I thought placement groups were a mechanism to improve EC2-EC2 traffic, and not EC2-S3? I'll do some digging and report back. I had always assumed EC2-S3 was so much faster than EC2-EC2 because they had dedicated networking for it, but perhaps I was mistaken...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] steveloughran commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
steveloughran commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1100073404

   + @mukund-thakur
   
   if the api you came up with mapped well to that vectored api *which is not yet too late to freeze*, then it'd be really good, even if you don't yet compile against releases with that api.
   
   see https://github.com/apache/hadoop/blob/feature-vectored-io/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md#default-void-readvectoredlist-extends-filerange-ranges-intfunctionbytebuffer-allocate
   
   ```java
   readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)
   ```
   
   where each file range returns a completable future to a byte buffer allocated with the allocator function you supplied.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
tustvold commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1106484122

   > It seems like something any of those pre-fetching systems requires is something to tell what to prefetch (or what ranges are needed in scatter / gather or vectored IO.
   
   Proposal in https://github.com/apache/arrow-rs/issues/1605, thank you all for your very helpful feedback :+1:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
tustvold commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1243556353

   DataFusion is now using the async parquet interface, which automatically handles buffering, and so this can be closed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1100171031

   Thank you @steveloughran  
   
   It seems like something any of those pre-fetching systems requires is something to tell what to prefetch (or what ranges are needed in scatter / gather or vectored IO.
   
   Maybe that is a good place to start @tustvold 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1097971612

   > Why would you implement this in the ObjectStore API, and not some FileScan component generic over object stores. The caching, spilling, logic, etc... is not going to vary based on object store provider? An ObjectStore API that supports fetch requests with an optional byte range should have us covered?
   
   I was thinking that keeping things behind an ObjectStore API makes sense because:
   1.  the economies and performance of S3, glacier, HDFS, local Minio could be quite different so the amount of consolidation, number of requests, aggressiveness of caching, might vary by object store implementation (not sure)
   2. Some caching strategies / implementations (e.g. redis, for example)  might not be appropriate to include in the core datafusion
   
   So in other words, binding details of caching / resource usage to DataFusion seemed to be unecessary
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
tustvold commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1096453560

   [This AWS blog post](https://aws.amazon.com/blogs/aws/the-floodgates-are-open-increased-network-bandwidth-for-ec2-instances/) from 2018 would suggest up to 25Gpbs EC2-S3 is possible and also highlights placement groups as a way to accelerate EC2-EC2. [This support question](https://aws.amazon.com/premiumsupport/knowledge-center/s3-maximum-transfer-speed-ec2/) would suggest the EC2-S3 limit has since been raised to 100Gbps.
   
   I also found [this benchmark](https://github.com/dvassallo/s3-benchmark#s3-to-ec2-bandwidth) from 2019, which shows speeds in the 1000s of MB/s, including 1,135 MB/s for the r4.2xlarge. I have not been able to find anyone complaining about the network speeds being below what is advertised.
   
   FWIW if using VPC networking, you need to make sure you have configured a [VPC Gateway](https://docs.aws.amazon.com/vpc/latest/privatelink/vpc-endpoints-s3.html) and are using a [region-specific endpoint](https://docs.aws.amazon.com/general/latest/gr/rande.html#regional-endpoints) for S3. Otherwise your traffic will transit an Internet Gateway or NAT gateway which will make things a lot slower (and cost a LOT of money).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] Cheappie commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
Cheappie commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1096623689

   > [This AWS blog post](https://aws.amazon.com/blogs/aws/the-floodgates-are-open-increased-network-bandwidth-for-ec2-instances/) from 2018 would suggest up to 25Gpbs EC2-S3 is possible and also highlights placement groups as a way to accelerate EC2-EC2. [This support question](https://aws.amazon.com/premiumsupport/knowledge-center/s3-maximum-transfer-speed-ec2/) would suggest the EC2-S3 limit has since been raised to 100Gbps.
   > 
   > I also found [this benchmark](https://github.com/dvassallo/s3-benchmark#s3-to-ec2-bandwidth) from 2019, which shows speeds in the 1000s of MB/s, including 1,135 MB/s for the r4.2xlarge. I have not been able to find anyone complaining about the network speeds being below what is advertised.
   > 
   > FWIW if using VPC networking, you need to make sure you have configured a [VPC Gateway](https://docs.aws.amazon.com/vpc/latest/privatelink/vpc-endpoints-s3.html) and are using a [region-specific endpoint](https://docs.aws.amazon.com/general/latest/gr/rande.html#regional-endpoints) for S3. Otherwise your traffic will transit an Internet Gateway or NAT gateway which will make things a lot slower (and cost a LOT of money).
   
   You might be right, I don't recall testing against region specific endpoint. That's really interesting, I will have to check that. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mukund-thakur commented on issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
mukund-thakur commented on issue #2205:
URL: https://github.com/apache/arrow-datafusion/issues/2205#issuecomment-1101579641

   Do try out the vectored api from the feature brach. Any feedback or improvements is highly appreciated. Thanks. 
   Here is the uber jira https://issues.apache.org/jira/browse/HADOOP-18103


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold closed issue #2205: RFC: Spill-To-Disk Object Storage Download

Posted by GitBox <gi...@apache.org>.
tustvold closed issue #2205: RFC: Spill-To-Disk Object Storage Download
URL: https://github.com/apache/arrow-datafusion/issues/2205


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org