You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/12 19:47:29 UTC

[arrow-datafusion] branch master updated: Ballista: Shuffle write bug fix (#714)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 24f32ca  Ballista: Shuffle write bug fix (#714)
24f32ca is described below

commit 24f32caea26f7a574da5bf9ae35f60e87057f6f9
Author: Andy Grove <an...@gmail.com>
AuthorDate: Mon Jul 12 13:47:21 2021 -0600

    Ballista: Shuffle write bug fix (#714)
    
    * shuffle write bug fix
    
    * Rename variable
    
    * windows
    
    * fix bug in windows-specific assertion
    
    * revert accidental change
---
 .../core/src/execution_plans/shuffle_writer.rs     | 33 ++++++++++++++++------
 1 file changed, 24 insertions(+), 9 deletions(-)

diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 92b4448..83d40ae 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -234,7 +234,7 @@ impl ExecutionPlan for ShuffleWriterExec {
                         indices[(*hash % num_output_partitions as u64) as usize]
                             .push(index as u64)
                     }
-                    for (num_output_partition, partition_indices) in
+                    for (output_partition, partition_indices) in
                         indices.into_iter().enumerate()
                     {
                         let indices = partition_indices.into();
@@ -254,13 +254,13 @@ impl ExecutionPlan for ShuffleWriterExec {
 
                         // write batch out
                         let start = Instant::now();
-                        match &mut writers[num_output_partition] {
+                        match &mut writers[output_partition] {
                             Some(w) => {
                                 w.write(&output_batch)?;
                             }
                             None => {
                                 let mut path = path.clone();
-                                path.push(&format!("{}", partition));
+                                path.push(&format!("{}", output_partition));
                                 std::fs::create_dir_all(&path)?;
 
                                 path.push("data.arrow");
@@ -271,7 +271,7 @@ impl ExecutionPlan for ShuffleWriterExec {
                                     ShuffleWriter::new(path, stream.schema().as_ref())?;
 
                                 writer.write(&output_batch)?;
-                                writers[num_output_partition] = Some(writer);
+                                writers[output_partition] = Some(writer);
                             }
                         }
                         self.metrics.write_time.add_elapsed(start);
@@ -419,20 +419,22 @@ impl ShuffleWriter {
 mod tests {
     use super::*;
     use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array};
+    use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use datafusion::physical_plan::expressions::Column;
+    use datafusion::physical_plan::limit::GlobalLimitExec;
     use datafusion::physical_plan::memory::MemoryExec;
     use tempfile::TempDir;
 
     #[tokio::test]
     async fn test() -> Result<()> {
-        let input_plan = create_input_plan()?;
+        let input_plan = Arc::new(CoalescePartitionsExec::new(create_input_plan()?));
         let work_dir = TempDir::new()?;
         let query_stage = ShuffleWriterExec::try_new(
             "jobOne".to_owned(),
             1,
             input_plan,
             work_dir.into_path().to_str().unwrap().to_owned(),
-            None,
+            Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
         )?;
         let mut stream = query_stage.execute(0).await?;
         let batches = utils::collect_stream(&mut stream)
@@ -441,17 +443,28 @@ mod tests {
         assert_eq!(1, batches.len());
         let batch = &batches[0];
         assert_eq!(3, batch.num_columns());
-        assert_eq!(1, batch.num_rows());
+        assert_eq!(2, batch.num_rows());
         let path = batch.columns()[1]
             .as_any()
             .downcast_ref::<StringArray>()
             .unwrap();
-        let file = path.value(0);
-        assert!(file.ends_with("data.arrow"));
+
+        let file0 = path.value(0);
+        assert!(
+            file0.ends_with("/jobOne/1/0/data.arrow")
+                || file0.ends_with("\\jobOne\\1\\0\\data.arrow")
+        );
+        let file1 = path.value(1);
+        assert!(
+            file1.ends_with("/jobOne/1/1/data.arrow")
+                || file1.ends_with("\\jobOne\\1\\1\\data.arrow")
+        );
+
         let stats = batch.columns()[2]
             .as_any()
             .downcast_ref::<StructArray>()
             .unwrap();
+
         let num_rows = stats
             .column_by_name("num_rows")
             .unwrap()
@@ -459,6 +472,8 @@ mod tests {
             .downcast_ref::<UInt64Array>()
             .unwrap();
         assert_eq!(4, num_rows.value(0));
+        assert_eq!(4, num_rows.value(1));
+
         Ok(())
     }