You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/05/04 18:32:21 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6154: Simplify MemoryWriteExec

alamb commented on code in PR #6154:
URL: https://github.com/apache/arrow-datafusion/pull/6154#discussion_r1185368790


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -327,24 +326,25 @@ impl ExecutionPlan for MemoryWriteExec {
     ) -> Result<SendableRecordBatchStream> {
         let batch_count = self.batches.len();
         let data = self.input.execute(partition, context)?;
-        if batch_count >= self.input.output_partitioning().partition_count() {
-            // If the number of input partitions matches the number of MemTable partitions,
-            // use a lightweight implementation that doesn't utilize as many locks.
-            let table_partition = self.batches[partition].clone();
-            Ok(Box::pin(MemorySinkOneToOneStream::try_new(
-                table_partition,
-                data,
-                self.schema.clone(),
-            )?))
-        } else {
-            // Otherwise, use the locked implementation.
-            let table_partition = self.batches[partition % batch_count].clone();
-            Ok(Box::pin(MemorySinkStream::try_new(
-                table_partition,
-                data,
-                self.schema.clone(),
-            )?))
-        }
+        let schema = self.schema.clone();
+        let state = (data, self.batches[partition % batch_count].clone());
+
+        let stream = futures::stream::unfold(state, |mut state| async move {
+            // hold lock during the entire write
+            let mut locked = state.1.write().await;
+            loop {
+                let batch = match state.0.next().await {
+                    Some(Ok(batch)) => batch,
+                    Some(Err(e)) => {
+                        drop(locked);
+                        return Some((Err(e), state));
+                    }
+                    None => return None,
+                };
+                locked.push(batch)
+            }
+        });
+        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))

Review Comment:
   After thinking about this more, I don't understand why we are locking at all in the loop -- if we are really worried about the locking performance we can just buffer the batches and copy them at the end of the stream. This should also keep the code simpler.  I'll make a follow on PR with a proposal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org