You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/12 19:47:29 UTC
[arrow-datafusion] branch master updated: Ballista: Shuffle write
bug fix (#714)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 24f32ca Ballista: Shuffle write bug fix (#714)
24f32ca is described below
commit 24f32caea26f7a574da5bf9ae35f60e87057f6f9
Author: Andy Grove <an...@gmail.com>
AuthorDate: Mon Jul 12 13:47:21 2021 -0600
Ballista: Shuffle write bug fix (#714)
* shuffle write bug fix
* Rename variable
* windows
* fix bug in windows-specific assertion
* revert accidental change
---
.../core/src/execution_plans/shuffle_writer.rs | 33 ++++++++++++++++------
1 file changed, 24 insertions(+), 9 deletions(-)
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 92b4448..83d40ae 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -234,7 +234,7 @@ impl ExecutionPlan for ShuffleWriterExec {
indices[(*hash % num_output_partitions as u64) as usize]
.push(index as u64)
}
- for (num_output_partition, partition_indices) in
+ for (output_partition, partition_indices) in
indices.into_iter().enumerate()
{
let indices = partition_indices.into();
@@ -254,13 +254,13 @@ impl ExecutionPlan for ShuffleWriterExec {
// write batch out
let start = Instant::now();
- match &mut writers[num_output_partition] {
+ match &mut writers[output_partition] {
Some(w) => {
w.write(&output_batch)?;
}
None => {
let mut path = path.clone();
- path.push(&format!("{}", partition));
+ path.push(&format!("{}", output_partition));
std::fs::create_dir_all(&path)?;
path.push("data.arrow");
@@ -271,7 +271,7 @@ impl ExecutionPlan for ShuffleWriterExec {
ShuffleWriter::new(path, stream.schema().as_ref())?;
writer.write(&output_batch)?;
- writers[num_output_partition] = Some(writer);
+ writers[output_partition] = Some(writer);
}
}
self.metrics.write_time.add_elapsed(start);
@@ -419,20 +419,22 @@ impl ShuffleWriter {
mod tests {
use super::*;
use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array};
+ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::expressions::Column;
+ use datafusion::physical_plan::limit::GlobalLimitExec;
use datafusion::physical_plan::memory::MemoryExec;
use tempfile::TempDir;
#[tokio::test]
async fn test() -> Result<()> {
- let input_plan = create_input_plan()?;
+ let input_plan = Arc::new(CoalescePartitionsExec::new(create_input_plan()?));
let work_dir = TempDir::new()?;
let query_stage = ShuffleWriterExec::try_new(
"jobOne".to_owned(),
1,
input_plan,
work_dir.into_path().to_str().unwrap().to_owned(),
- None,
+ Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
)?;
let mut stream = query_stage.execute(0).await?;
let batches = utils::collect_stream(&mut stream)
@@ -441,17 +443,28 @@ mod tests {
assert_eq!(1, batches.len());
let batch = &batches[0];
assert_eq!(3, batch.num_columns());
- assert_eq!(1, batch.num_rows());
+ assert_eq!(2, batch.num_rows());
let path = batch.columns()[1]
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
- let file = path.value(0);
- assert!(file.ends_with("data.arrow"));
+
+ let file0 = path.value(0);
+ assert!(
+ file0.ends_with("/jobOne/1/0/data.arrow")
+ || file0.ends_with("\\jobOne\\1\\0\\data.arrow")
+ );
+ let file1 = path.value(1);
+ assert!(
+ file1.ends_with("/jobOne/1/1/data.arrow")
+ || file1.ends_with("\\jobOne\\1\\1\\data.arrow")
+ );
+
let stats = batch.columns()[2]
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
+
let num_rows = stats
.column_by_name("num_rows")
.unwrap()
@@ -459,6 +472,8 @@ mod tests {
.downcast_ref::<UInt64Array>()
.unwrap();
assert_eq!(4, num_rows.value(0));
+ assert_eq!(4, num_rows.value(1));
+
Ok(())
}