You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2022/04/20 18:29:04 UTC

[arrow-datafusion] branch master updated: Add BatchPartitioner (#2285) (#2287)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 583b4ab8d Add BatchPartitioner (#2285) (#2287)
583b4ab8d is described below

commit 583b4ab8dfe6148a7387841d112dd50b1151f6fb
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Apr 20 19:28:25 2022 +0100

    Add BatchPartitioner (#2285) (#2287)
    
    * Add BatchPartitioner (#2285)
    
    * Don't panic on unknown partitioning scheme
    
    * Review feedback
---
 .../core/src/execution_plans/shuffle_writer.rs     | 109 ++++------
 datafusion/core/src/physical_plan/metrics/value.rs |   5 +
 datafusion/core/src/physical_plan/repartition.rs   | 239 +++++++++++++--------
 3 files changed, 194 insertions(+), 159 deletions(-)

diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 77190c5ab..7a87406af 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -34,16 +34,13 @@ use crate::serde::protobuf::ShuffleWritePartition;
 use crate::serde::scheduler::PartitionStats;
 use async_trait::async_trait;
 use datafusion::arrow::array::{
-    Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder,
-    UInt64Builder,
+    ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder, UInt64Builder,
 };
-use datafusion::arrow::compute::take;
 use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::physical_plan::common::IPCWriter;
-use datafusion::physical_plan::hash_utils::create_hashes;
 use datafusion::physical_plan::memory::MemoryStream;
 use datafusion::physical_plan::metrics::{
     self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
@@ -55,6 +52,7 @@ use datafusion::physical_plan::{
 use futures::StreamExt;
 
 use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::repartition::BatchPartitioner;
 use log::{debug, info};
 
 /// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
@@ -81,6 +79,7 @@ pub struct ShuffleWriterExec {
 struct ShuffleWriteMetrics {
     /// Time spend writing batches to shuffle files
     write_time: metrics::Time,
+    repart_time: metrics::Time,
     input_rows: metrics::Count,
     output_rows: metrics::Count,
 }
@@ -88,6 +87,8 @@ struct ShuffleWriteMetrics {
 impl ShuffleWriteMetrics {
     fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
         let write_time = MetricBuilder::new(metrics).subset_time("write_time", partition);
+        let repart_time =
+            MetricBuilder::new(metrics).subset_time("repart_time", partition);
 
         let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
 
@@ -95,6 +96,7 @@ impl ShuffleWriteMetrics {
 
         Self {
             write_time,
+            repart_time,
             input_rows,
             output_rows,
         }
@@ -202,77 +204,48 @@ impl ShuffleWriterExec {
                     writers.push(None);
                 }
 
-                let hashes_buf = &mut vec![];
-                let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
+                let mut partitioner = BatchPartitioner::try_new(
+                    Partitioning::Hash(exprs.clone(), *n),
+                    write_metrics.repart_time.clone(),
+                )?;
 
                 while let Some(result) = stream.next().await {
                     let input_batch = result?;
 
                     write_metrics.input_rows.add(input_batch.num_rows());
 
-                    let arrays = exprs
-                        .iter()
-                        .map(|expr| {
-                            Ok(expr
-                                .evaluate(&input_batch)?
-                                .into_array(input_batch.num_rows()))
-                        })
-                        .collect::<Result<Vec<_>>>()?;
-                    hashes_buf.clear();
-                    hashes_buf.resize(arrays[0].len(), 0);
-                    // Hash arrays and compute buckets based on number of partitions
-                    let hashes = create_hashes(&arrays, &random_state, hashes_buf)?;
-                    let mut indices = vec![vec![]; num_output_partitions];
-                    for (index, hash) in hashes.iter().enumerate() {
-                        indices[(*hash % num_output_partitions as u64) as usize]
-                            .push(index as u64)
-                    }
-                    for (output_partition, partition_indices) in
-                        indices.into_iter().enumerate()
-                    {
-                        let indices = partition_indices.into();
-
-                        // Produce batches based on indices
-                        let columns = input_batch
-                            .columns()
-                            .iter()
-                            .map(|c| {
-                                take(c.as_ref(), &indices, None).map_err(|e| {
-                                    DataFusionError::Execution(e.to_string())
-                                })
-                            })
-                            .collect::<Result<Vec<Arc<dyn Array>>>>()?;
-
-                        let output_batch =
-                            RecordBatch::try_new(input_batch.schema(), columns)?;
-
-                        // write non-empty batch out
-
-                        // TODO optimize so we don't write or fetch empty partitions
-                        // if output_batch.num_rows() > 0 {
-                        let timer = write_metrics.write_time.timer();
-                        match &mut writers[output_partition] {
-                            Some(w) => {
-                                w.write(&output_batch)?;
+                    partitioner.partition(
+                        input_batch,
+                        |output_partition, output_batch| {
+                            // write non-empty batch out
+
+                            // TODO optimize so we don't write or fetch empty partitions
+                            // if output_batch.num_rows() > 0 {
+                            let timer = write_metrics.write_time.timer();
+                            match &mut writers[output_partition] {
+                                Some(w) => {
+                                    w.write(&output_batch)?;
+                                }
+                                None => {
+                                    let mut path = path.clone();
+                                    path.push(&format!("{}", output_partition));
+                                    std::fs::create_dir_all(&path)?;
+
+                                    path.push(format!("data-{}.arrow", input_partition));
+                                    info!("Writing results to {:?}", path);
+
+                                    let mut writer =
+                                        IPCWriter::new(&path, stream.schema().as_ref())?;
+
+                                    writer.write(&output_batch)?;
+                                    writers[output_partition] = Some(writer);
+                                }
                             }
-                            None => {
-                                let mut path = path.clone();
-                                path.push(&format!("{}", output_partition));
-                                std::fs::create_dir_all(&path)?;
-
-                                path.push(format!("data-{}.arrow", input_partition));
-                                info!("Writing results to {:?}", path);
-
-                                let mut writer =
-                                    IPCWriter::new(&path, stream.schema().as_ref())?;
-
-                                writer.write(&output_batch)?;
-                                writers[output_partition] = Some(writer);
-                            }
-                        }
-                        write_metrics.output_rows.add(output_batch.num_rows());
-                        timer.done();
-                    }
+                            write_metrics.output_rows.add(output_batch.num_rows());
+                            timer.done();
+                            Ok(())
+                        },
+                    )?;
                 }
 
                 let mut part_locs = vec![];
diff --git a/datafusion/core/src/physical_plan/metrics/value.rs b/datafusion/core/src/physical_plan/metrics/value.rs
index ffb4ebb3f..4bf92221f 100644
--- a/datafusion/core/src/physical_plan/metrics/value.rs
+++ b/datafusion/core/src/physical_plan/metrics/value.rs
@@ -300,6 +300,11 @@ impl<'a> ScopedTimerGuard<'a> {
         }
     }
 
+    /// Restarts the timer recording from the current time
+    pub fn restart(&mut self) {
+        self.start = Some(Instant::now())
+    }
+
     /// Stop the timer, record the time taken and consume self
     pub fn done(mut self) {
         self.stop()
diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs
index 036421637..379555396 100644
--- a/datafusion/core/src/physical_plan/repartition.rs
+++ b/datafusion/core/src/physical_plan/repartition.rs
@@ -26,9 +26,10 @@ use std::{any::Any, vec};
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::hash_utils::create_hashes;
 use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
+use arrow::array::{ArrayRef, UInt64Builder};
+use arrow::datatypes::SchemaRef;
+use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::Array, error::Result as ArrowResult};
-use arrow::{compute::take, datatypes::SchemaRef};
 use log::debug;
 use tokio_stream::wrappers::UnboundedReceiverStream;
 
@@ -39,6 +40,7 @@ use super::{RecordBatchStream, SendableRecordBatchStream};
 use async_trait::async_trait;
 
 use crate::execution::context::TaskContext;
+use datafusion_physical_expr::PhysicalExpr;
 use futures::stream::Stream;
 use futures::StreamExt;
 use hashbrown::HashMap;
@@ -62,6 +64,133 @@ struct RepartitionExecState {
     abort_helper: Arc<AbortOnDropMany<()>>,
 }
 
+/// A utility that can be used to partition batches based on [`Partitioning`]
+pub struct BatchPartitioner {
+    state: BatchPartitionerState,
+    timer: metrics::Time,
+}
+
+enum BatchPartitionerState {
+    Hash {
+        random_state: ahash::RandomState,
+        exprs: Vec<Arc<dyn PhysicalExpr>>,
+        num_partitions: usize,
+        hash_buffer: Vec<u64>,
+    },
+    RoundRobin {
+        num_partitions: usize,
+        next_idx: usize,
+    },
+}
+
+impl BatchPartitioner {
+    /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`]
+    ///
+    /// The time spent repartitioning will be recorded to `timer`
+    pub fn try_new(partitioning: Partitioning, timer: metrics::Time) -> Result<Self> {
+        let state = match partitioning {
+            Partitioning::RoundRobinBatch(num_partitions) => {
+                BatchPartitionerState::RoundRobin {
+                    num_partitions,
+                    next_idx: 0,
+                }
+            }
+            Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash {
+                exprs,
+                num_partitions,
+                // Use fixed random hash
+                random_state: ahash::RandomState::with_seeds(0, 0, 0, 0),
+                hash_buffer: vec![],
+            },
+            other => {
+                return Err(DataFusionError::NotImplemented(format!(
+                    "Unsupported repartitioning scheme {:?}",
+                    other
+                )))
+            }
+        };
+
+        Ok(Self { state, timer })
+    }
+
+    /// Partition the provided [`RecordBatch`] into one or more partitioned [`RecordBatch`]
+    /// based on the [`Partitioning`] specified on construction
+    ///
+    /// `f` will be called for each partitioned [`RecordBatch`] with the corresponding
+    /// partition index. Any error returned by `f` will be immediately returned by this
+    /// function without attempting to publish further [`RecordBatch`]
+    ///
+    /// The time spent repartitioning, not including time spent in `f` will be recorded
+    /// to the [`metrics::Time`] provided on construction
+    pub fn partition<F>(&mut self, batch: RecordBatch, mut f: F) -> Result<()>
+    where
+        F: FnMut(usize, RecordBatch) -> Result<()>,
+    {
+        match &mut self.state {
+            BatchPartitionerState::RoundRobin {
+                num_partitions,
+                next_idx,
+            } => {
+                let idx = *next_idx;
+                *next_idx = (*next_idx + 1) % *num_partitions;
+                f(idx, batch)?;
+            }
+            BatchPartitionerState::Hash {
+                random_state,
+                exprs,
+                num_partitions: partitions,
+                hash_buffer,
+            } => {
+                let mut timer = self.timer.timer();
+
+                let arrays = exprs
+                    .iter()
+                    .map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
+                    .collect::<Result<Vec<_>>>()?;
+
+                hash_buffer.clear();
+                hash_buffer.resize(batch.num_rows(), 0);
+
+                create_hashes(&arrays, random_state, hash_buffer)?;
+
+                let mut indices: Vec<_> = (0..*partitions)
+                    .map(|_| UInt64Builder::new(batch.num_rows()))
+                    .collect();
+
+                for (index, hash) in hash_buffer.iter().enumerate() {
+                    indices[(*hash % *partitions as u64) as usize]
+                        .append_value(index as u64)
+                        .unwrap();
+                }
+
+                for (partition, mut indices) in indices.into_iter().enumerate() {
+                    let indices = indices.finish();
+                    if indices.is_empty() {
+                        continue;
+                    }
+
+                    // Produce batches based on indices
+                    let columns = batch
+                        .columns()
+                        .iter()
+                        .map(|c| {
+                            arrow::compute::take(c.as_ref(), &indices, None)
+                                .map_err(DataFusionError::ArrowError)
+                        })
+                        .collect::<Result<Vec<ArrayRef>>>()?;
+
+                    let batch = RecordBatch::try_new(batch.schema(), columns).unwrap();
+
+                    timer.stop();
+                    f(partition, batch)?;
+                    timer.restart();
+                }
+            }
+        }
+        Ok(())
+    }
+}
+
 /// The repartition operator maps N input partitions to M output partitions based on a
 /// partitioning scheme. No guarantees are made about the order of the resulting partitions.
 #[derive(Debug)]
@@ -199,8 +328,6 @@ impl ExecutionPlan for RepartitionExec {
                     mpsc::unbounded_channel::<Option<ArrowResult<RecordBatch>>>();
                 state.channels.insert(partition, (sender, receiver));
             }
-            // Use fixed random state
-            let random = ahash::RandomState::with_seeds(0, 0, 0, 0);
 
             // launch one async task per *input* partition
             let mut join_handles = Vec::with_capacity(num_input_partitions);
@@ -215,7 +342,6 @@ impl ExecutionPlan for RepartitionExec {
 
                 let input_task: JoinHandle<Result<()>> =
                     tokio::spawn(Self::pull_from_input(
-                        random.clone(),
                         self.input.clone(),
                         i,
                         txs.clone(),
@@ -299,7 +425,6 @@ impl RepartitionExec {
     ///
     /// txs hold the output sending channels for each output partition
     async fn pull_from_input(
-        random_state: ahash::RandomState,
         input: Arc<dyn ExecutionPlan>,
         i: usize,
         mut txs: HashMap<usize, UnboundedSender<Option<ArrowResult<RecordBatch>>>>,
@@ -307,16 +432,14 @@ impl RepartitionExec {
         r_metrics: RepartitionMetrics,
         context: Arc<TaskContext>,
     ) -> Result<()> {
-        let num_output_partitions = txs.len();
+        let mut partitioner =
+            BatchPartitioner::try_new(partitioning, r_metrics.repart_time.clone())?;
 
         // execute the child operator
         let timer = r_metrics.fetch_time.timer();
         let mut stream = input.execute(i, context).await?;
         timer.done();
 
-        let mut counter = 0;
-        let hashes_buf = &mut vec![];
-
         // While there are still outputs to send to, keep
         // pulling inputs
         while !txs.is_empty() {
@@ -326,89 +449,23 @@ impl RepartitionExec {
             timer.done();
 
             // Input is done
-            if result.is_none() {
-                break;
-            }
-            let result: ArrowResult<RecordBatch> = result.unwrap();
-
-            match &partitioning {
-                Partitioning::RoundRobinBatch(_) => {
-                    let timer = r_metrics.send_time.timer();
-                    let output_partition = counter % num_output_partitions;
-                    // if there is still a receiver, send to it
-                    if let Some(tx) = txs.get_mut(&output_partition) {
-                        if tx.send(Some(result)).is_err() {
-                            // If the other end has hung up, it was an early shutdown (e.g. LIMIT)
-                            txs.remove(&output_partition);
-                        }
+            let batch = match result {
+                Some(result) => result?,
+                None => break,
+            };
+
+            partitioner.partition(batch, |partition, partitioned| {
+                let timer = r_metrics.send_time.timer();
+                // if there is still a receiver, send to it
+                if let Some(tx) = txs.get_mut(&partition) {
+                    if tx.send(Some(Ok(partitioned))).is_err() {
+                        // If the other end has hung up, it was an early shutdown (e.g. LIMIT)
+                        txs.remove(&partition);
                     }
-                    timer.done();
                 }
-                Partitioning::Hash(exprs, _) => {
-                    let timer = r_metrics.repart_time.timer();
-                    let input_batch = result?;
-                    let arrays = exprs
-                        .iter()
-                        .map(|expr| {
-                            Ok(expr
-                                .evaluate(&input_batch)?
-                                .into_array(input_batch.num_rows()))
-                        })
-                        .collect::<Result<Vec<_>>>()?;
-                    hashes_buf.clear();
-                    hashes_buf.resize(arrays[0].len(), 0);
-                    // Hash arrays and compute buckets based on number of partitions
-                    let hashes = create_hashes(&arrays, &random_state, hashes_buf)?;
-                    let mut indices = vec![vec![]; num_output_partitions];
-                    for (index, hash) in hashes.iter().enumerate() {
-                        indices[(*hash % num_output_partitions as u64) as usize]
-                            .push(index as u64)
-                    }
-                    timer.done();
-
-                    for (num_output_partition, partition_indices) in
-                        indices.into_iter().enumerate()
-                    {
-                        if partition_indices.is_empty() {
-                            continue;
-                        }
-                        let timer = r_metrics.repart_time.timer();
-                        let indices = partition_indices.into();
-                        // Produce batches based on indices
-                        let columns = input_batch
-                            .columns()
-                            .iter()
-                            .map(|c| {
-                                take(c.as_ref(), &indices, None).map_err(|e| {
-                                    DataFusionError::Execution(e.to_string())
-                                })
-                            })
-                            .collect::<Result<Vec<Arc<dyn Array>>>>()?;
-                        let output_batch =
-                            RecordBatch::try_new(input_batch.schema(), columns);
-                        timer.done();
-
-                        let timer = r_metrics.send_time.timer();
-                        // if there is still a receiver, send to it
-                        if let Some(tx) = txs.get_mut(&num_output_partition) {
-                            if tx.send(Some(output_batch)).is_err() {
-                                // If the other end has hung up, it was an early shutdown (e.g. LIMIT)
-                                txs.remove(&num_output_partition);
-                            }
-                        }
-                        timer.done();
-                    }
-                }
-                other => {
-                    // this should be unreachable as long as the validation logic
-                    // in the constructor is kept up-to-date
-                    return Err(DataFusionError::NotImplemented(format!(
-                        "Unsupported repartitioning scheme {:?}",
-                        other
-                    )));
-                }
-            }
-            counter += 1;
+                timer.done();
+                Ok(())
+            })?;
         }
 
         Ok(())