You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/04 10:25:01 UTC

[arrow-datafusion] branch main updated: Lower some log levels to trace during plan execution (#6193)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2632816f66 Lower some log levels to trace during plan execution (#6193)
2632816f66 is described below

commit 2632816f664334c96bb2ce22d0f0e12509561613
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu May 4 06:24:55 2023 -0400

    Lower some log levels to trace during plan execution (#6193)
---
 datafusion/core/src/physical_plan/empty.rs                |  4 ++--
 datafusion/core/src/physical_plan/explain.rs              |  6 +++---
 datafusion/core/src/physical_plan/filter.rs               |  4 ++--
 datafusion/core/src/physical_plan/limit.rs                |  6 +++---
 datafusion/core/src/physical_plan/projection.rs           |  4 ++--
 datafusion/core/src/physical_plan/repartition/mod.rs      |  8 ++++----
 datafusion/core/src/physical_plan/sorts/sort.rs           | 15 +++++----------
 .../core/src/physical_plan/sorts/sort_preserving_merge.rs |  6 +++---
 datafusion/core/src/physical_plan/union.rs                | 15 +++++++--------
 datafusion/core/src/physical_plan/unnest.rs               |  6 +++---
 10 files changed, 34 insertions(+), 40 deletions(-)

diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs
index d35db5b645..33258f28fa 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -27,7 +27,7 @@ use crate::physical_plan::{
 use arrow::array::{ArrayRef, NullArray};
 use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
-use log::debug;
+use log::trace;
 
 use super::expressions::PhysicalSortExpr;
 use super::{common, SendableRecordBatchStream, Statistics};
@@ -132,7 +132,7 @@ impl ExecutionPlan for EmptyExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        debug!("Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+        trace!("Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
 
         if partition >= self.partitions {
             return Err(DataFusionError::Internal(format!(
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index 93fcfe45da..c46a4a7391 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -29,7 +29,7 @@ use crate::{
     },
 };
 use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
-use log::debug;
+use log::trace;
 
 use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream};
 use crate::execution::context::TaskContext;
@@ -109,7 +109,7 @@ impl ExecutionPlan for ExplainExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        debug!("Start ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+        trace!("Start ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         if 0 != partition {
             return Err(DataFusionError::Internal(format!(
                 "ExplainExec invalid partition {partition}"
@@ -154,7 +154,7 @@ impl ExecutionPlan for ExplainExec {
         let tracking_metrics =
             MemTrackingMetrics::new(&metrics, context.memory_pool(), partition);
 
-        debug!(
+        trace!(
             "Before returning SizedRecordBatch in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
 
         Ok(Box::pin(SizedRecordBatchStream::new(
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index 3786568cf3..67c990a7e3 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -39,7 +39,7 @@ use datafusion_expr::Operator;
 use datafusion_physical_expr::expressions::BinaryExpr;
 use datafusion_physical_expr::{split_conjunction, AnalysisContext};
 
-use log::debug;
+use log::trace;
 
 use crate::execution::context::TaskContext;
 use futures::stream::{Stream, StreamExt};
@@ -147,7 +147,7 @@ impl ExecutionPlan for FilterExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        debug!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+        trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
         Ok(Box::pin(FilterExecStream {
             schema: self.input.schema(),
diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs
index bfeb9c65b9..1adec23252 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -19,7 +19,7 @@
 
 use futures::stream::Stream;
 use futures::stream::StreamExt;
-use log::debug;
+use log::trace;
 use std::any::Any;
 use std::pin::Pin;
 use std::sync::Arc;
@@ -136,7 +136,7 @@ impl ExecutionPlan for GlobalLimitExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        debug!(
+        trace!(
             "Start GlobalLimitExec::execute for partition: {}",
             partition
         );
@@ -320,7 +320,7 @@ impl ExecutionPlan for LocalLimitExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        debug!("Start LocalLimitExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+        trace!("Start LocalLimitExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
         let stream = self.input.execute(partition, context)?;
         Ok(Box::pin(LimitStream::new(
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index e70e4faebd..f2775079fc 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -35,7 +35,7 @@ use crate::physical_plan::{
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::record_batch::{RecordBatch, RecordBatchOptions};
 use futures::stream::{Stream, StreamExt};
-use log::debug;
+use log::trace;
 
 use super::expressions::{Column, PhysicalSortExpr};
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
@@ -251,7 +251,7 @@ impl ExecutionPlan for ProjectionExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        debug!("Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+        trace!("Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         Ok(Box::pin(ProjectionStream {
             schema: self.schema.clone(),
             expr: self.expr.iter().map(|x| x.0.clone()).collect(),
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs
index 8db230a122..67fc63d235 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -33,7 +33,7 @@ use crate::physical_plan::{
 use arrow::array::{ArrayRef, UInt64Builder};
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
-use log::debug;
+use log::trace;
 
 use self::distributor_channels::{DistributionReceiver, DistributionSender};
 
@@ -319,7 +319,7 @@ impl ExecutionPlan for RepartitionExec {
 
     /// Specifies whether this plan generates an infinite stream of records.
     /// If the plan does not support pipelining, but its input(s) are
-    /// infinite, returns an error to indicate this.    
+    /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         Ok(children[0])
     }
@@ -350,7 +350,7 @@ impl ExecutionPlan for RepartitionExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        debug!(
+        trace!(
             "Start RepartitionExec::execute for partition: {}",
             partition
         );
@@ -411,7 +411,7 @@ impl ExecutionPlan for RepartitionExec {
             state.abort_helper = Arc::new(AbortOnDropMany(join_handles))
         }
 
-        debug!(
+        trace!(
             "Before returning stream in RepartitionExec::execute for partition: {}",
             partition
         );
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 9461f9689e..12018944d4 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -44,7 +44,7 @@ use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
 use datafusion_physical_expr::EquivalenceProperties;
 use futures::{StreamExt, TryStreamExt};
-use log::{debug, error};
+use log::{debug, error, trace};
 use std::any::Any;
 use std::fmt;
 use std::fmt::{Debug, Formatter};
@@ -582,16 +582,11 @@ impl ExecutionPlan for SortExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        debug!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
-
-        debug!(
-            "Start invoking SortExec's input.execute for partition: {}",
-            partition
-        );
+        trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
 
         let input = self.input.execute(partition, context.clone())?;
 
-        debug!("End SortExec's input.execute for partition: {}", partition);
+        trace!("End SortExec's input.execute for partition: {}", partition);
 
         Ok(Box::pin(RecordBatchStreamAdapter::new(
             self.schema(),
@@ -642,7 +637,7 @@ async fn do_sort(
     context: Arc<TaskContext>,
     fetch: Option<usize>,
 ) -> Result<SendableRecordBatchStream> {
-    debug!(
+    trace!(
         "Start do_sort for partition {} of context session_id {} and task_id {:?}",
         partition_id,
         context.session_id(),
@@ -663,7 +658,7 @@ async fn do_sort(
         sorter.insert_batch(batch).await?;
     }
     let result = sorter.sort();
-    debug!(
+    trace!(
         "End do_sort for partition {} of context session_id {} and task_id {:?}",
         partition_id,
         context.session_id(),
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index e96be05f4a..b780583601 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -21,7 +21,7 @@ use std::any::Any;
 use std::sync::Arc;
 
 use arrow::datatypes::SchemaRef;
-use log::debug;
+use log::{debug, trace};
 use tokio::sync::mpsc;
 
 use crate::error::{DataFusionError, Result};
@@ -149,7 +149,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        debug!(
+        trace!(
             "Start SortPreservingMergeExec::execute for partition: {}",
             partition
         );
@@ -163,7 +163,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
             MemTrackingMetrics::new(&self.metrics, context.memory_pool(), partition);
 
         let input_partitions = self.input.output_partitioning().partition_count();
-        debug!(
+        trace!(
             "Number of input partitions of  SortPreservingMergeExec::execute: {}",
             input_partitions
         );
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index cace842aaf..d1f5ec0c29 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -32,8 +32,7 @@ use arrow::{
 use datafusion_common::{DFSchemaRef, DataFusionError};
 use futures::{Stream, StreamExt};
 use itertools::Itertools;
-use log::debug;
-use log::warn;
+use log::{debug, trace, warn};
 
 use super::{
     expressions::PhysicalSortExpr,
@@ -223,7 +222,7 @@ impl ExecutionPlan for UnionExec {
         mut partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        debug!("Start UnionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+        trace!("Start UnionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
         // record the tiny amount of work done in this function so
         // elapsed_compute is reported as non zero
@@ -288,23 +287,23 @@ impl ExecutionPlan for UnionExec {
 /// |         |---+
 /// | Input 1 |   |
 /// |         |-------------+
-/// +---------+   |         |     
+/// +---------+   |         |
 ///               |         |         +---------+
 ///               +------------------>|         |
 ///                 +---------------->| Combine |-->
 ///                 | +-------------->|         |
 ///                 | |     |         +---------+
-/// +---------+     | |     |       
+/// +---------+     | |     |
 /// |         |-----+ |     |
 /// | Input 2 |       |     |
 /// |         |---------------+
-/// +---------+       |     | |    
+/// +---------+       |     | |
 ///                   |     | |       +---------+
 ///                   |     +-------->|         |
 ///                   |       +------>| Combine |-->
 ///                   |         +---->|         |
 ///                   |         |     +---------+
-/// +---------+       |         |     
+/// +---------+       |         |
 /// |         |-------+         |
 /// | Input 3 |                 |
 /// |         |-----------------+
@@ -392,7 +391,7 @@ impl ExecutionPlan for InterleaveExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        debug!("Start InterleaveExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+        trace!("Start InterleaveExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
         let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
         // record the tiny amount of work done in this function so
         // elapsed_compute is reported as non zero
diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs
index 9a9408035d..9a84af5523 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -26,7 +26,7 @@ use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
 use futures::Stream;
 use futures::StreamExt;
-use log::debug;
+use log::trace;
 use std::time::Instant;
 use std::{any::Any, sync::Arc};
 
@@ -78,7 +78,7 @@ impl ExecutionPlan for UnnestExec {
 
     /// Specifies whether this plan generates an infinite stream of records.
     /// If the plan does not support pipelining, but its input(s) are
-    /// infinite, returns an error to indicate this.    
+    /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         Ok(children[0])
     }
@@ -208,7 +208,7 @@ impl UnnestStream {
                     Some(result)
                 }
                 other => {
-                    debug!(
+                    trace!(
                         "Processed {} probe-side input batches containing {} rows and \
                         produced {} output batches containing {} rows in {} ms",
                         self.num_input_batches,