You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/04/07 11:18:59 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5894: Use interleave in BatchBuilder (~10% faster merge)

alamb commented on code in PR #5894:
URL: https://github.com/apache/arrow-datafusion/pull/5894#discussion_r1160635957


##########
datafusion/core/src/physical_plan/sorts/builder.rs:
##########
@@ -89,82 +99,37 @@ impl BatchBuilder {
             return Ok(None);
         }
 
-        // Mapping from stream index to the index of the first buffer from that stream
-        let mut buffer_idx = 0;
-        let mut stream_to_buffer_idx = Vec::with_capacity(self.batches.len());
-
-        for batches in &self.batches {
-            stream_to_buffer_idx.push(buffer_idx);
-            buffer_idx += batches.len();
-        }
-
-        let columns = self
-            .schema
-            .fields()
-            .iter()
-            .enumerate()
-            .map(|(column_idx, field)| {
-                let arrays = self
+        let columns = (0..self.schema.fields.len())
+            .map(|column_idx| {
+                let arrays: Vec<_> = self
                     .batches
                     .iter()
-                    .flat_map(|batch| {
-                        batch.iter().map(|batch| batch.column(column_idx).data())
-                    })
+                    .map(|(_, batch)| batch.column(column_idx).as_ref())
                     .collect();
-
-                let mut array_data = MutableArrayData::new(
-                    arrays,
-                    field.is_nullable(),
-                    self.indices.len(),
-                );
-
-                let first = &self.indices[0];
-                let mut buffer_idx =
-                    stream_to_buffer_idx[first.stream_idx] + first.batch_idx;
-                let mut start_row_idx = first.row_idx;
-                let mut end_row_idx = start_row_idx + 1;
-
-                for row_index in self.indices.iter().skip(1) {
-                    let next_buffer_idx =
-                        stream_to_buffer_idx[row_index.stream_idx] + row_index.batch_idx;
-
-                    if next_buffer_idx == buffer_idx && row_index.row_idx == end_row_idx {
-                        // subsequent row in same batch
-                        end_row_idx += 1;
-                        continue;
-                    }
-
-                    // emit current batch of rows for current buffer
-                    array_data.extend(buffer_idx, start_row_idx, end_row_idx);
-
-                    // start new batch of rows
-                    buffer_idx = next_buffer_idx;
-                    start_row_idx = row_index.row_idx;
-                    end_row_idx = start_row_idx + 1;
-                }
-
-                // emit final batch of rows
-                array_data.extend(buffer_idx, start_row_idx, end_row_idx);
-                make_array(array_data.freeze())
+                Ok(interleave(&arrays, &self.indices)?)

Review Comment:
   👌 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org