You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "tustvold (via GitHub)" <gi...@apache.org> on 2023/04/28 10:35:58 UTC

[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4153: InMemory append API

tustvold commented on code in PR #4153:
URL: https://github.com/apache/arrow-rs/pull/4153#discussion_r1180246497


##########
object_store/src/memory.rs:
##########
@@ -329,8 +340,59 @@ impl AsyncWrite for InMemoryUpload {
     }
 }
 
+struct InMemoryAppend {
+    location: Path,
+    data: Vec<u8>,
+    storage: StorageType,
+}
+
+impl AsyncWrite for InMemoryAppend {
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+        buf: &[u8],
+    ) -> std::task::Poll<Result<usize, io::Error>> {
+        self.data.extend_from_slice(buf);
+        Poll::Ready(Ok(buf.len()))
+    }
+
+    fn poll_flush(
+        mut self: Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        let storage = StorageType::clone(&self.storage);
+
+        let mut writer = storage.write();
+
+        if let Some((bytes, _)) = writer.remove(&self.location) {

Review Comment:
   It might be more idiomatic to use [`entry`](https://doc.rust-lang.org/std/collections/struct.BTreeMap.html#method.entry)



##########
object_store/src/memory.rs:
##########
@@ -329,8 +340,59 @@ impl AsyncWrite for InMemoryUpload {
     }
 }
 
+struct InMemoryAppend {
+    location: Path,
+    data: Vec<u8>,
+    storage: StorageType,
+}
+
+impl AsyncWrite for InMemoryAppend {
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+        buf: &[u8],
+    ) -> std::task::Poll<Result<usize, io::Error>> {
+        self.data.extend_from_slice(buf);
+        Poll::Ready(Ok(buf.len()))
+    }
+
+    fn poll_flush(
+        mut self: Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        let storage = StorageType::clone(&self.storage);
+
+        let mut writer = storage.write();
+
+        if let Some((bytes, _)) = writer.remove(&self.location) {
+            let buf = std::mem::take(&mut self.data);
+            let concat = Bytes::from_iter(bytes.into_iter().chain(buf.into_iter()));
+            writer.insert(self.location.clone(), (concat, Utc::now()));
+        } else {
+            writer.insert(
+                self.location.clone(),
+                (Bytes::from(std::mem::take(&mut self.data)), Utc::now()),
+            );
+        };
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_shutdown(
+        self: Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        // does nothing different than flush
+        match self.poll_flush(_cx) {
+            Poll::Ready(_) => Poll::Ready(Ok(())),
+            Poll::Pending => Poll::Pending,
+        }

Review Comment:
   ```suggestion
           cx: &mut std::task::Context<'_>,
       ) -> std::task::Poll<Result<(), io::Error>> {
           self.poll_flush(cx)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org