You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "metesynnada (via GitHub)" <gi...@apache.org> on 2023/04/28 14:40:36 UTC

[GitHub] [arrow-datafusion] metesynnada commented on a diff in pull request #6154: Simplify MemoryWriteExec

metesynnada commented on code in PR #6154:
URL: https://github.com/apache/arrow-datafusion/pull/6154#discussion_r1180493624


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -327,24 +326,20 @@ impl ExecutionPlan for MemoryWriteExec {
     ) -> Result<SendableRecordBatchStream> {
         let batch_count = self.batches.len();
         let data = self.input.execute(partition, context)?;
-        if batch_count >= self.input.output_partitioning().partition_count() {
-            // If the number of input partitions matches the number of MemTable partitions,
-            // use a lightweight implementation that doesn't utilize as many locks.
-            let table_partition = self.batches[partition].clone();
-            Ok(Box::pin(MemorySinkOneToOneStream::try_new(
-                table_partition,
-                data,
-                self.schema.clone(),
-            )?))
-        } else {
-            // Otherwise, use the locked implementation.
-            let table_partition = self.batches[partition % batch_count].clone();
-            Ok(Box::pin(MemorySinkStream::try_new(
-                table_partition,
-                data,
-                self.schema.clone(),
-            )?))
-        }
+        let schema = self.schema.clone();
+        let state = (data, self.batches[partition % batch_count].clone());
+
+        let stream = futures::stream::unfold(state, |mut state| async move {

Review Comment:
   Now I get what you mean, if we do not hold the async lock in the state as acquired, the folding becomes possible. This substantially shrinks the code size, cool pattern.



##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -327,24 +326,20 @@ impl ExecutionPlan for MemoryWriteExec {
     ) -> Result<SendableRecordBatchStream> {
         let batch_count = self.batches.len();
         let data = self.input.execute(partition, context)?;
-        if batch_count >= self.input.output_partitioning().partition_count() {
-            // If the number of input partitions matches the number of MemTable partitions,
-            // use a lightweight implementation that doesn't utilize as many locks.
-            let table_partition = self.batches[partition].clone();
-            Ok(Box::pin(MemorySinkOneToOneStream::try_new(
-                table_partition,
-                data,
-                self.schema.clone(),
-            )?))
-        } else {
-            // Otherwise, use the locked implementation.
-            let table_partition = self.batches[partition % batch_count].clone();
-            Ok(Box::pin(MemorySinkStream::try_new(
-                table_partition,
-                data,
-                self.schema.clone(),
-            )?))
-        }
+        let schema = self.schema.clone();
+        let state = (data, self.batches[partition % batch_count].clone());

Review Comment:
   There is a performance difference of batch_count * acquire_lock, if this is OK, we can move on to this. Do you think it would affect the benchmarks? I am not quite familiar with that part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org