You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "crepererum (via GitHub)" <gi...@apache.org> on 2023/04/19 08:43:02 UTC

[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #6045: fix: split "union" and "interleave"

crepererum commented on code in PR #6045:
URL: https://github.com/apache/arrow-datafusion/pull/6045#discussion_r1171016354


##########
datafusion/core/src/physical_plan/union.rs:
##########
@@ -273,36 +230,210 @@ impl ExecutionPlan for UnionExec {
         let elapsed_compute = baseline_metrics.elapsed_compute().clone();
         let _timer = elapsed_compute.timer(); // record on drop
 
-        if self.partition_aware {
-            let mut input_stream_vec = vec![];
-            for input in self.inputs.iter() {
-                if partition < input.output_partitioning().partition_count() {
-                    input_stream_vec.push(input.execute(partition, context.clone())?);
-                } else {
-                    // Do not find a partition to execute
-                    break;
-                }
-            }
-            if input_stream_vec.len() == self.inputs.len() {
-                let stream = Box::pin(CombinedRecordBatchStream::new(
-                    self.schema(),
-                    input_stream_vec,
-                ));
+        // find partition to execute
+        for input in self.inputs.iter() {
+            // Calculate whether partition belongs to the current partition
+            if partition < input.output_partitioning().partition_count() {
+                let stream = input.execute(partition, context)?;
+                debug!("Found a Union partition to execute");
                 return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
+            } else {
+                partition -= input.output_partitioning().partition_count();
             }
+        }
+
+        warn!("Error in Union: Partition {} not found", partition);
+
+        Err(crate::error::DataFusionError::Execution(format!(
+            "Partition {partition} not found in Union"
+        )))
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "UnionExec")
+            }
+        }
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Statistics {
+        self.inputs
+            .iter()
+            .map(|ep| ep.statistics())
+            .reduce(stats_union)
+            .unwrap_or_default()
+    }
+
+    fn benefits_from_input_partitioning(&self) -> bool {
+        false
+    }
+}
+
+/// Combines multiple input streams by interleaving them.
+///
+/// This only works if all inputs have the same hash-partitioning.
+///
+/// # Data Flow
+/// ```text
+/// +---------+
+/// |         |---+
+/// | Input 1 |   |
+/// |         |-------------+
+/// +---------+   |         |     
+///               |         |         +---------+
+///               +------------------>|         |
+///                 +---------------->| Combine |-->
+///                 | +-------------->|         |
+///                 | |     |         +---------+
+/// +---------+     | |     |       
+/// |         |-----+ |     |
+/// | Input 2 |       |     |
+/// |         |---------------+
+/// +---------+       |     | |    
+///                   |     | |       +---------+
+///                   |     +-------->|         |
+///                   |       +------>| Combine |-->
+///                   |         +---->|         |
+///                   |         |     +---------+
+/// +---------+       |         |     
+/// |         |-------+         |
+/// | Input 3 |                 |
+/// |         |-----------------+
+/// +---------+
+/// ```
+#[derive(Debug)]
+pub struct InterleaveExec {
+    /// Input execution plan
+    inputs: Vec<Arc<dyn ExecutionPlan>>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+    /// Schema of Union
+    schema: SchemaRef,
+}
+
+impl InterleaveExec {
+    /// Create a new InterleaveExec
+    pub fn try_new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Result<Self> {
+        let schema = union_schema(&inputs);
+
+        if !can_interleave(&inputs) {
+            return Err(DataFusionError::Internal(String::from(
+                "Not all InterleaveExec children have a consistent hash partitioning",
+            )));
+        }
+
+        Ok(InterleaveExec {
+            inputs,
+            metrics: ExecutionPlanMetricsSet::new(),
+            schema,
+        })
+    }
+
+    /// Get inputs of the execution plan
+    pub fn inputs(&self) -> &Vec<Arc<dyn ExecutionPlan>> {
+        &self.inputs
+    }
+}
+
+impl ExecutionPlan for InterleaveExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children.iter().any(|x| *x))
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        self.inputs.clone()
+    }
+
+    /// Output of the union is the combination of all output partitions of the inputs
+    fn output_partitioning(&self) -> Partitioning {
+        self.inputs[0].output_partitioning()
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        // If the Union has an output ordering, it maintains at least one
+        // child's ordering (i.e. the meet).
+        // For instance, assume that the first child is SortExpr('a','b','c'),
+        // the second child is SortExpr('a','b') and the third child is
+        // SortExpr('a','b'). The output ordering would be SortExpr('a','b'),
+        // which is the "meet" of all input orderings. In this example, this
+        // function will return vec![false, true, true], indicating that we
+        // preserve the orderings for the 2nd and the 3rd children.
+        if let Some(output_ordering) = self.output_ordering() {
+            self.inputs()
+                .iter()
+                .map(|child| {
+                    if let Some(child_ordering) = child.output_ordering() {
+                        output_ordering.len() == child_ordering.len()
+                    } else {
+                        false
+                    }
+                })
+                .collect()
         } else {
-            // find partition to execute
-            for input in self.inputs.iter() {
-                // Calculate whether partition belongs to the current partition
-                if partition < input.output_partitioning().partition_count() {
-                    let stream = input.execute(partition, context)?;
-                    debug!("Found a Union partition to execute");
-                    return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
-                } else {
-                    partition -= input.output_partitioning().partition_count();
-                }
+            vec![false; self.inputs().len()]
+        }
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(InterleaveExec::try_new(children)?))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        debug!("Start InterleaveExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+        // record the tiny amount of work done in this function so
+        // elapsed_compute is reported as non zero
+        let elapsed_compute = baseline_metrics.elapsed_compute().clone();
+        let _timer = elapsed_compute.timer(); // record on drop
+
+        let mut input_stream_vec = vec![];
+        for input in self.inputs.iter() {
+            if partition < input.output_partitioning().partition_count() {
+                input_stream_vec.push(input.execute(partition, context.clone())?);
+            } else {
+                // Do not find a partition to execute
+                break;
             }
         }
+        if input_stream_vec.len() == self.inputs.len() {
+            let stream = Box::pin(CombinedRecordBatchStream::new(
+                self.schema(),
+                input_stream_vec,
+            ));
+            return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
+        }
 
         warn!("Error in Union: Partition {} not found", partition);
 

Review Comment:
   done



##########
datafusion/core/src/physical_plan/union.rs:
##########
@@ -273,36 +230,210 @@ impl ExecutionPlan for UnionExec {
         let elapsed_compute = baseline_metrics.elapsed_compute().clone();
         let _timer = elapsed_compute.timer(); // record on drop
 
-        if self.partition_aware {
-            let mut input_stream_vec = vec![];
-            for input in self.inputs.iter() {
-                if partition < input.output_partitioning().partition_count() {
-                    input_stream_vec.push(input.execute(partition, context.clone())?);
-                } else {
-                    // Do not find a partition to execute
-                    break;
-                }
-            }
-            if input_stream_vec.len() == self.inputs.len() {
-                let stream = Box::pin(CombinedRecordBatchStream::new(
-                    self.schema(),
-                    input_stream_vec,
-                ));
+        // find partition to execute
+        for input in self.inputs.iter() {
+            // Calculate whether partition belongs to the current partition
+            if partition < input.output_partitioning().partition_count() {
+                let stream = input.execute(partition, context)?;
+                debug!("Found a Union partition to execute");
                 return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
+            } else {
+                partition -= input.output_partitioning().partition_count();
             }
+        }
+
+        warn!("Error in Union: Partition {} not found", partition);
+
+        Err(crate::error::DataFusionError::Execution(format!(
+            "Partition {partition} not found in Union"
+        )))
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "UnionExec")
+            }
+        }
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Statistics {
+        self.inputs
+            .iter()
+            .map(|ep| ep.statistics())
+            .reduce(stats_union)
+            .unwrap_or_default()
+    }
+
+    fn benefits_from_input_partitioning(&self) -> bool {
+        false
+    }
+}
+
+/// Combines multiple input streams by interleaving them.
+///
+/// This only works if all inputs have the same hash-partitioning.
+///
+/// # Data Flow
+/// ```text
+/// +---------+
+/// |         |---+
+/// | Input 1 |   |
+/// |         |-------------+
+/// +---------+   |         |     
+///               |         |         +---------+
+///               +------------------>|         |
+///                 +---------------->| Combine |-->
+///                 | +-------------->|         |
+///                 | |     |         +---------+
+/// +---------+     | |     |       
+/// |         |-----+ |     |
+/// | Input 2 |       |     |
+/// |         |---------------+
+/// +---------+       |     | |    
+///                   |     | |       +---------+
+///                   |     +-------->|         |
+///                   |       +------>| Combine |-->
+///                   |         +---->|         |
+///                   |         |     +---------+
+/// +---------+       |         |     
+/// |         |-------+         |
+/// | Input 3 |                 |
+/// |         |-----------------+
+/// +---------+
+/// ```
+#[derive(Debug)]
+pub struct InterleaveExec {
+    /// Input execution plan
+    inputs: Vec<Arc<dyn ExecutionPlan>>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+    /// Schema of Union
+    schema: SchemaRef,
+}
+
+impl InterleaveExec {
+    /// Create a new InterleaveExec
+    pub fn try_new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Result<Self> {
+        let schema = union_schema(&inputs);
+
+        if !can_interleave(&inputs) {
+            return Err(DataFusionError::Internal(String::from(
+                "Not all InterleaveExec children have a consistent hash partitioning",
+            )));
+        }
+
+        Ok(InterleaveExec {
+            inputs,
+            metrics: ExecutionPlanMetricsSet::new(),
+            schema,
+        })
+    }
+
+    /// Get inputs of the execution plan
+    pub fn inputs(&self) -> &Vec<Arc<dyn ExecutionPlan>> {
+        &self.inputs
+    }
+}
+
+impl ExecutionPlan for InterleaveExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but it its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children.iter().any(|x| *x))
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        self.inputs.clone()
+    }
+
+    /// Output of the union is the combination of all output partitions of the inputs
+    fn output_partitioning(&self) -> Partitioning {
+        self.inputs[0].output_partitioning()
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        // If the Union has an output ordering, it maintains at least one
+        // child's ordering (i.e. the meet).
+        // For instance, assume that the first child is SortExpr('a','b','c'),
+        // the second child is SortExpr('a','b') and the third child is
+        // SortExpr('a','b'). The output ordering would be SortExpr('a','b'),
+        // which is the "meet" of all input orderings. In this example, this
+        // function will return vec![false, true, true], indicating that we
+        // preserve the orderings for the 2nd and the 3rd children.
+        if let Some(output_ordering) = self.output_ordering() {
+            self.inputs()
+                .iter()
+                .map(|child| {
+                    if let Some(child_ordering) = child.output_ordering() {
+                        output_ordering.len() == child_ordering.len()
+                    } else {
+                        false
+                    }
+                })
+                .collect()
         } else {
-            // find partition to execute
-            for input in self.inputs.iter() {
-                // Calculate whether partition belongs to the current partition
-                if partition < input.output_partitioning().partition_count() {
-                    let stream = input.execute(partition, context)?;
-                    debug!("Found a Union partition to execute");
-                    return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
-                } else {
-                    partition -= input.output_partitioning().partition_count();
-                }
+            vec![false; self.inputs().len()]
+        }
+    }

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org