You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/14 19:46:10 UTC

[arrow-datafusion] branch master updated: chore: add `debug!` log in some execution operators (#2231)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d631a9ca2 chore: add `debug!` log in some execution operators (#2231)
d631a9ca2 is described below

commit d631a9ca227c3f0e866b221904e0cee78f40262c
Author: Nga Tran <ng...@live.com>
AuthorDate: Thu Apr 14 15:46:05 2022 -0400

    chore: add `debug!` log in some execution operators (#2231)
    
    * chore: Add some debug info for plan execution
    
    * chore: more debug log
    
    * refactor: address review comments
---
 datafusion/core/src/physical_plan/empty.rs         |  4 ++-
 datafusion/core/src/physical_plan/explain.rs       |  7 ++++-
 datafusion/core/src/physical_plan/filter.rs        |  3 ++-
 datafusion/core/src/physical_plan/limit.rs         |  6 +++++
 datafusion/core/src/physical_plan/projection.rs    |  2 ++
 datafusion/core/src/physical_plan/repartition.rs   | 10 ++++++++
 datafusion/core/src/physical_plan/sorts/sort.rs    | 30 +++++++++++++++++++---
 .../physical_plan/sorts/sort_preserving_merge.rs   | 24 ++++++++++++++---
 datafusion/core/src/physical_plan/union.rs         |  5 ++++
 9 files changed, 82 insertions(+), 9 deletions(-)

diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs
index 42ed368e1..43e749e1d 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -27,6 +27,7 @@ use crate::physical_plan::{
 use arrow::array::NullArray;
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
+use log::debug;
 
 use super::expressions::PhysicalSortExpr;
 use super::{common, SendableRecordBatchStream, Statistics};
@@ -116,8 +117,9 @@ impl ExecutionPlan for EmptyExec {
     async fn execute(
         &self,
         partition: usize,
-        _context: Arc<TaskContext>,
+        context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
+        debug!("Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         // GlobalLimitExec has a single output partition
         if 0 != partition {
             return Err(DataFusionError::Internal(format!(
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index 4905e9e17..fb6631588 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -29,6 +29,7 @@ use crate::{
     },
 };
 use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
+use log::debug;
 
 use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream};
 use crate::execution::context::TaskContext;
@@ -112,8 +113,9 @@ impl ExecutionPlan for ExplainExec {
     async fn execute(
         &self,
         partition: usize,
-        _context: Arc<TaskContext>,
+        context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
+        debug!("Start ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         if 0 != partition {
             return Err(DataFusionError::Internal(format!(
                 "ExplainExec invalid partition {}",
@@ -156,6 +158,9 @@ impl ExecutionPlan for ExplainExec {
         let metrics = ExecutionPlanMetricsSet::new();
         let tracking_metrics = MemTrackingMetrics::new(&metrics, partition);
 
+        debug!(
+            "Before returning SizedRecordBatch in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+
         Ok(Box::pin(SizedRecordBatchStream::new(
             self.schema.clone(),
             vec![Arc::new(record_batch)],
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index ff86d0893..158bedf28 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -37,6 +37,7 @@ use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 
 use async_trait::async_trait;
+use log::debug;
 
 use crate::execution::context::TaskContext;
 use futures::stream::{Stream, StreamExt};
@@ -133,8 +134,8 @@ impl ExecutionPlan for FilterExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
+        debug!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
-
         Ok(Box::pin(FilterExecStream {
             schema: self.input.schema().clone(),
             predicate: self.predicate.clone(),
diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs
index 1cf99b3f5..71e08afd2 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -24,6 +24,7 @@ use std::task::{Context, Poll};
 
 use futures::stream::Stream;
 use futures::stream::StreamExt;
+use log::debug;
 
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{
@@ -131,6 +132,10 @@ impl ExecutionPlan for GlobalLimitExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
+        debug!(
+            "Start GlobalLimitExec::execute for partition: {}",
+            partition
+        );
         // GlobalLimitExec has a single output partition
         if 0 != partition {
             return Err(DataFusionError::Internal(format!(
@@ -282,6 +287,7 @@ impl ExecutionPlan for LocalLimitExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
+        debug!("Start LocalLimitExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
         let stream = self.input.execute(partition, context).await?;
         Ok(Box::pin(LimitStream::new(
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index a49d00669..ae422b3d0 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -33,6 +33,7 @@ use crate::physical_plan::{
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
+use log::debug;
 
 use super::expressions::{Column, PhysicalSortExpr};
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
@@ -150,6 +151,7 @@ impl ExecutionPlan for ProjectionExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
+        debug!("Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         Ok(Box::pin(ProjectionStream {
             schema: self.schema.clone(),
             expr: self.expr.iter().map(|x| x.0.clone()).collect(),
diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs
index 18e87e09d..036421637 100644
--- a/datafusion/core/src/physical_plan/repartition.rs
+++ b/datafusion/core/src/physical_plan/repartition.rs
@@ -29,6 +29,7 @@ use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Stati
 use arrow::record_batch::RecordBatch;
 use arrow::{array::Array, error::Result as ArrowResult};
 use arrow::{compute::take, datatypes::SchemaRef};
+use log::debug;
 use tokio_stream::wrappers::UnboundedReceiverStream;
 
 use super::common::{AbortOnDropMany, AbortOnDropSingle};
@@ -174,6 +175,10 @@ impl ExecutionPlan for RepartitionExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
+        debug!(
+            "Start RepartitionExec::execute for partition: {}",
+            partition
+        );
         // lock mutexes
         let mut state = self.state.lock().await;
 
@@ -231,6 +236,11 @@ impl ExecutionPlan for RepartitionExec {
             state.abort_helper = Arc::new(AbortOnDropMany(join_handles))
         }
 
+        debug!(
+            "Before returning stream in RepartitionExec::execute for partition: {}",
+            partition
+        );
+
         // now return stream for the specified *output* partition which will
         // read from the channel
         Ok(Box::pin(RepartitionStream {
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 1d13efd14..8edd20a8b 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -753,6 +753,7 @@ impl ExecutionPlan for SortExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
+        debug!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         if !self.preserve_partitioning {
             if 0 != partition {
                 return Err(DataFusionError::Internal(format!(
@@ -769,16 +770,26 @@ impl ExecutionPlan for SortExec {
             }
         }
 
+        debug!(
+            "Start invoking SortExec's input.execute for partition: {}",
+            partition
+        );
+
         let input = self.input.execute(partition, context.clone()).await?;
 
-        do_sort(
+        debug!("End SortExec's input.execute for partition: {}", partition);
+
+        let result = do_sort(
             input,
             partition,
             self.expr.clone(),
             self.metrics_set.clone(),
             context,
         )
-        .await
+        .await;
+
+        debug!("End SortExec::execute for partition {}", partition);
+        result
     }
 
     fn metrics(&self) -> Option<MetricsSet> {
@@ -867,6 +878,12 @@ async fn do_sort(
     metrics_set: CompositeMetricsSet,
     context: Arc<TaskContext>,
 ) -> Result<SendableRecordBatchStream> {
+    debug!(
+        "Start do_sort for partition {} of context session_id {} and task_id {:?}",
+        partition_id,
+        context.session_id(),
+        context.task_id()
+    );
     let schema = input.schema();
     let tracking_metrics =
         metrics_set.new_intermediate_tracking(partition_id, context.runtime_env());
@@ -883,7 +900,14 @@ async fn do_sort(
         let batch = batch?;
         sorter.insert_batch(batch, &tracking_metrics).await?;
     }
-    sorter.sort().await
+    let result = sorter.sort().await;
+    debug!(
+        "End do_sort for partition {} of context session_id {} and task_id {:?}",
+        partition_id,
+        context.session_id(),
+        context.task_id()
+    );
+    result
 }
 
 #[cfg(test)]
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 4b0fccd26..4bc3606e0 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -21,6 +21,7 @@ use crate::physical_plan::common::AbortOnDropMany;
 use crate::physical_plan::metrics::{
     ExecutionPlanMetricsSet, MemTrackingMetrics, MetricsSet,
 };
+use log::debug;
 use parking_lot::Mutex;
 use std::any::Any;
 use std::collections::{BinaryHeap, VecDeque};
@@ -155,6 +156,10 @@ impl ExecutionPlan for SortPreservingMergeExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
+        debug!(
+            "Start SortPreservingMergeExec::execute for partition: {}",
+            partition
+        );
         if 0 != partition {
             return Err(DataFusionError::Internal(format!(
                 "SortPreservingMergeExec invalid partition {}",
@@ -165,6 +170,10 @@ impl ExecutionPlan for SortPreservingMergeExec {
         let tracking_metrics = MemTrackingMetrics::new(&self.metrics, partition);
 
         let input_partitions = self.input.output_partitioning().partition_count();
+        debug!(
+            "Number of input partitions of  SortPreservingMergeExec::execute: {}",
+            input_partitions
+        );
         match input_partitions {
             0 => Err(DataFusionError::Internal(
                 "SortPreservingMergeExec requires at least one input partition"
@@ -172,7 +181,9 @@ impl ExecutionPlan for SortPreservingMergeExec {
             )),
             1 => {
                 // bypass if there is only one partition to merge (no metrics in this case either)
-                self.input.execute(0, context).await
+                let result = self.input.execute(0, context).await;
+                debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input");
+                result
             }
             _ => {
                 let (receivers, join_handles) = (0..input_partitions)
@@ -189,14 +200,20 @@ impl ExecutionPlan for SortPreservingMergeExec {
                     })
                     .unzip();
 
-                Ok(Box::pin(SortPreservingMergeStream::new_from_receivers(
+                debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute");
+
+                let result = Box::pin(SortPreservingMergeStream::new_from_receivers(
                     receivers,
                     AbortOnDropMany(join_handles),
                     self.schema(),
                     &self.expr,
                     tracking_metrics,
                     context.session_config().batch_size,
-                )))
+                ));
+
+                debug!("Got stream result from SortPreservingMergeStream::new_from_receivers");
+
+                Ok(result)
             }
         }
     }
@@ -299,6 +316,7 @@ impl SortPreservingMergeStream {
         tracking_metrics: MemTrackingMetrics,
         batch_size: usize,
     ) -> Self {
+        debug!("Start SortPreservingMergeStream::new_from_receivers");
         let stream_count = receivers.len();
         let batches = (0..stream_count)
             .into_iter()
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index 8c730ea0e..b794cad26 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -25,6 +25,7 @@ use std::{any::Any, sync::Arc};
 
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
 use futures::StreamExt;
+use log::debug;
 
 use super::{
     expressions::PhysicalSortExpr,
@@ -111,6 +112,7 @@ impl ExecutionPlan for UnionExec {
         mut partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
+        debug!("Start UnionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
         // record the tiny amount of work done in this function so
         // elapsed_compute is reported as non zero
@@ -122,12 +124,15 @@ impl ExecutionPlan for UnionExec {
             // Calculate whether partition belongs to the current partition
             if partition < input.output_partitioning().partition_count() {
                 let stream = input.execute(partition, context.clone()).await?;
+                debug!("Found a Union partition to execute");
                 return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
             } else {
                 partition -= input.output_partitioning().partition_count();
             }
         }
 
+        debug!("Error in Union: Partition {} not found", partition);
+
         Err(crate::error::DataFusionError::Execution(format!(
             "Partition {} not found in Union",
             partition