You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "tustvold (via GitHub)" <gi...@apache.org> on 2024/03/20 07:04:28 UTC

[PR] Implement MultipartStore for ThrottledStore [arrow-rs]

tustvold opened a new pull request, #5533:
URL: https://github.com/apache/arrow-rs/pull/5533

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #.
   
   # Rationale for this change
    
   <!--
   Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
   Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.
   -->
   
   Follow up to #5500 
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   * Implements MultipartStore for ThrottledStore
   * Limits concurrency in BufWriter (which #5500 removed)
   
   # Are there any user-facing changes?
   
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!---
   If there are any breaking changes to public APIs, please add the `breaking change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement MultipartStore for ThrottledStore [arrow-rs]

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #5533:
URL: https://github.com/apache/arrow-rs/pull/5533#discussion_r1531589773


##########
object_store/src/buffered.rs:
##########
@@ -216,6 +216,7 @@ impl AsyncBufRead for BufReader {
 /// streamed using [`ObjectStore::put_multipart`]
 pub struct BufWriter {
     capacity: usize,
+    max_concurrency: usize,

Review Comment:
   Prior to #5500 this was an implementation detail of the stores, now it is user configurable :tada: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement MultipartStore for ThrottledStore [arrow-rs]

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold merged PR #5533:
URL: https://github.com/apache/arrow-rs/pull/5533


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement MultipartStore for ThrottledStore [arrow-rs]

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #5533:
URL: https://github.com/apache/arrow-rs/pull/5533#discussion_r1531588213


##########
object_store/src/upload.rs:
##########
@@ -110,31 +113,44 @@ pub struct WriteMultipart {
 impl WriteMultipart {
     /// Create a new [`WriteMultipart`] that will upload using 5MB chunks
     pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
-        Self::new_with_capacity(upload, 5 * 1024 * 1024)
+        Self::new_with_chunk_size(upload, 5 * 1024 * 1024)
     }
 
-    /// Create a new [`WriteMultipart`] that will upload in fixed `capacity` sized chunks
-    pub fn new_with_capacity(upload: Box<dyn MultipartUpload>, capacity: usize) -> Self {
+    /// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks
+    pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {

Review Comment:
   I think this naming is a bit more obvious



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement MultipartStore for ThrottledStore [arrow-rs]

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #5533:
URL: https://github.com/apache/arrow-rs/pull/5533#discussion_r1544324342


##########
object_store/src/buffered.rs:
##########
@@ -250,10 +251,21 @@ impl BufWriter {
         Self {
             capacity,
             store,
+            max_concurrency: 8,
             state: BufWriterState::Buffer(path, Vec::new()),
         }
     }
 
+    /// Override the maximum number of in-flight requests for this writer
+    ///
+    /// Defaults to 8
+    pub fn with_max_concurrency(self, max_concurrency: usize) -> Self {
+        Self {
+            max_concurrency,
+            ..self
+        }
+    }

Review Comment:
   It will apply backpressure if there is any in-flight request



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement MultipartStore for ThrottledStore [arrow-rs]

Posted by "Jefffrey (via GitHub)" <gi...@apache.org>.
Jefffrey commented on code in PR #5533:
URL: https://github.com/apache/arrow-rs/pull/5533#discussion_r1544040274


##########
object_store/src/buffered.rs:
##########
@@ -250,10 +251,21 @@ impl BufWriter {
         Self {
             capacity,
             store,
+            max_concurrency: 8,
             state: BufWriterState::Buffer(path, Vec::new()),
         }
     }
 
+    /// Override the maximum number of in-flight requests for this writer
+    ///
+    /// Defaults to 8
+    pub fn with_max_concurrency(self, max_concurrency: usize) -> Self {
+        Self {
+            max_concurrency,
+            ..self
+        }
+    }

Review Comment:
   What happens if 0 is passed in?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Implement MultipartStore for ThrottledStore [arrow-rs]

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #5533:
URL: https://github.com/apache/arrow-rs/pull/5533#discussion_r1531589373


##########
object_store/src/upload.rs:
##########
@@ -110,31 +113,44 @@ pub struct WriteMultipart {
 impl WriteMultipart {
     /// Create a new [`WriteMultipart`] that will upload using 5MB chunks
     pub fn new(upload: Box<dyn MultipartUpload>) -> Self {
-        Self::new_with_capacity(upload, 5 * 1024 * 1024)
+        Self::new_with_chunk_size(upload, 5 * 1024 * 1024)
     }
 
-    /// Create a new [`WriteMultipart`] that will upload in fixed `capacity` sized chunks
-    pub fn new_with_capacity(upload: Box<dyn MultipartUpload>, capacity: usize) -> Self {
+    /// Create a new [`WriteMultipart`] that will upload in fixed `chunk_size` sized chunks
+    pub fn new_with_chunk_size(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {
         Self {
             upload,
-            buffer: Vec::with_capacity(capacity),
+            buffer: Vec::with_capacity(chunk_size),
             tasks: Default::default(),
         }
     }
 
-    /// Wait until there are `max_concurrency` or fewer requests in-flight
-    pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> Result<()> {
-        while self.tasks.len() > max_concurrency {
-            self.tasks.join_next().await.unwrap()??;
+    /// Polls for there to be less than `max_concurrency` [`UploadPart`] in progress

Review Comment:
   I've changed this to be `less than` instead of equal as I think it means if you want to limit to `x` requests you poll for `x` requests, as opposed to `x - 1`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org