You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/28 09:55:32 UTC
[arrow-rs] branch master updated: object_store: Flush buffered multipart only during poll_shutdown (#3397)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 8f9969246 object_store: Flush buffered multipart only during poll_shutdown (#3397)
8f9969246 is described below
commit 8f9969246d13741ead0cbcbb1cb7bab057ca15d2
Author: askoa <11...@users.noreply.github.com>
AuthorDate: Wed Dec 28 04:55:25 2022 -0500
object_store: Flush buffered multipart only during poll_shutdown (#3397)
Co-authored-by: askoa <as...@local>
---
object_store/src/multipart.rs | 56 ++++++++++++++++++++++++++++++-------------
1 file changed, 40 insertions(+), 16 deletions(-)
diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs
index de8591462..65427d1f2 100644
--- a/object_store/src/multipart.rs
+++ b/object_store/src/multipart.rs
@@ -109,6 +109,43 @@ where
}
}
+impl<T> CloudMultiPartUpload<T>
+where
+ T: CloudMultiPartUploadImpl + Send + Sync,
+{
+ // The `poll_flush` function will only flush the in-progress tasks.
+ // The `final_flush` method called during `poll_shutdown` will flush
+ // the `current_buffer` along with in-progress tasks.
+ // Please see https://github.com/apache/arrow-rs/issues/3390 for more details.
+ fn final_flush(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Result<(), io::Error>> {
+ // Poll current tasks
+ self.as_mut().poll_tasks(cx)?;
+
+ // If current_buffer is not empty, see if it can be submitted
+ if !self.current_buffer.is_empty() && self.tasks.len() < self.max_concurrency {
+ let out_buffer: Vec<u8> = std::mem::take(&mut self.current_buffer);
+ let inner = Arc::clone(&self.inner);
+ let part_idx = self.current_part_idx;
+ self.tasks.push(Box::pin(async move {
+ let upload_part = inner.put_multipart_part(out_buffer, part_idx).await?;
+ Ok((part_idx, upload_part))
+ }));
+ }
+
+ self.as_mut().poll_tasks(cx)?;
+
+ // If tasks and current_buffer are empty, return Ready
+ if self.tasks.is_empty() && self.current_buffer.is_empty() {
+ Poll::Ready(Ok(()))
+ } else {
+ Poll::Pending
+ }
+ }
+}
+
impl<T> AsyncWrite for CloudMultiPartUpload<T>
where
T: CloudMultiPartUploadImpl + Send + Sync,
@@ -158,21 +195,8 @@ where
// Poll current tasks
self.as_mut().poll_tasks(cx)?;
- // If current_buffer is not empty, see if it can be submitted
- if !self.current_buffer.is_empty() && self.tasks.len() < self.max_concurrency {
- let out_buffer: Vec<u8> = std::mem::take(&mut self.current_buffer);
- let inner = Arc::clone(&self.inner);
- let part_idx = self.current_part_idx;
- self.tasks.push(Box::pin(async move {
- let upload_part = inner.put_multipart_part(out_buffer, part_idx).await?;
- Ok((part_idx, upload_part))
- }));
- }
-
- self.as_mut().poll_tasks(cx)?;
-
- // If tasks and current_buffer are empty, return Ready
- if self.tasks.is_empty() && self.current_buffer.is_empty() {
+ // If tasks is empty, return Ready
+ if self.tasks.is_empty() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
@@ -184,7 +208,7 @@ where
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), io::Error>> {
// First, poll flush
- match self.as_mut().poll_flush(cx) {
+ match self.as_mut().final_flush(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(res) => res?,
};