You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/28 09:55:32 UTC

[arrow-rs] branch master updated: object_store: Flush buffered multipart only during poll_shutdown (#3397)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f9969246 object_store: Flush buffered multipart only during poll_shutdown (#3397)
8f9969246 is described below

commit 8f9969246d13741ead0cbcbb1cb7bab057ca15d2
Author: askoa <11...@users.noreply.github.com>
AuthorDate: Wed Dec 28 04:55:25 2022 -0500

    object_store: Flush buffered multipart only during poll_shutdown (#3397)
    
    Co-authored-by: askoa <as...@local>
---
 object_store/src/multipart.rs | 56 ++++++++++++++++++++++++++++++-------------
 1 file changed, 40 insertions(+), 16 deletions(-)

diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs
index de8591462..65427d1f2 100644
--- a/object_store/src/multipart.rs
+++ b/object_store/src/multipart.rs
@@ -109,6 +109,43 @@ where
     }
 }
 
+impl<T> CloudMultiPartUpload<T>
+where
+    T: CloudMultiPartUploadImpl + Send + Sync,
+{
+    // The `poll_flush` function will only flush the in-progress tasks.
+    // The `final_flush` method called during `poll_shutdown` will flush
+    // the `current_buffer` along with in-progress tasks.
+    // Please see https://github.com/apache/arrow-rs/issues/3390 for more details.
+    fn final_flush(
+        mut self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<(), io::Error>> {
+        // Poll current tasks
+        self.as_mut().poll_tasks(cx)?;
+
+        // If current_buffer is not empty, see if it can be submitted
+        if !self.current_buffer.is_empty() && self.tasks.len() < self.max_concurrency {
+            let out_buffer: Vec<u8> = std::mem::take(&mut self.current_buffer);
+            let inner = Arc::clone(&self.inner);
+            let part_idx = self.current_part_idx;
+            self.tasks.push(Box::pin(async move {
+                let upload_part = inner.put_multipart_part(out_buffer, part_idx).await?;
+                Ok((part_idx, upload_part))
+            }));
+        }
+
+        self.as_mut().poll_tasks(cx)?;
+
+        // If tasks and current_buffer are empty, return Ready
+        if self.tasks.is_empty() && self.current_buffer.is_empty() {
+            Poll::Ready(Ok(()))
+        } else {
+            Poll::Pending
+        }
+    }
+}
+
 impl<T> AsyncWrite for CloudMultiPartUpload<T>
 where
     T: CloudMultiPartUploadImpl + Send + Sync,
@@ -158,21 +195,8 @@ where
         // Poll current tasks
         self.as_mut().poll_tasks(cx)?;
 
-        // If current_buffer is not empty, see if it can be submitted
-        if !self.current_buffer.is_empty() && self.tasks.len() < self.max_concurrency {
-            let out_buffer: Vec<u8> = std::mem::take(&mut self.current_buffer);
-            let inner = Arc::clone(&self.inner);
-            let part_idx = self.current_part_idx;
-            self.tasks.push(Box::pin(async move {
-                let upload_part = inner.put_multipart_part(out_buffer, part_idx).await?;
-                Ok((part_idx, upload_part))
-            }));
-        }
-
-        self.as_mut().poll_tasks(cx)?;
-
-        // If tasks and current_buffer are empty, return Ready
-        if self.tasks.is_empty() && self.current_buffer.is_empty() {
+        // If tasks is empty, return Ready
+        if self.tasks.is_empty() {
             Poll::Ready(Ok(()))
         } else {
             Poll::Pending
@@ -184,7 +208,7 @@ where
         cx: &mut std::task::Context<'_>,
     ) -> Poll<Result<(), io::Error>> {
         // First, poll flush
-        match self.as_mut().poll_flush(cx) {
+        match self.as_mut().final_flush(cx) {
             Poll::Pending => return Poll::Pending,
             Poll::Ready(res) => res?,
         };