You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/04 10:25:01 UTC
[arrow-datafusion] branch main updated: Lower some log levels to trace during plan execution (#6193)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2632816f66 Lower some log levels to trace during plan execution (#6193)
2632816f66 is described below
commit 2632816f664334c96bb2ce22d0f0e12509561613
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu May 4 06:24:55 2023 -0400
Lower some log levels to trace during plan execution (#6193)
---
datafusion/core/src/physical_plan/empty.rs | 4 ++--
datafusion/core/src/physical_plan/explain.rs | 6 +++---
datafusion/core/src/physical_plan/filter.rs | 4 ++--
datafusion/core/src/physical_plan/limit.rs | 6 +++---
datafusion/core/src/physical_plan/projection.rs | 4 ++--
datafusion/core/src/physical_plan/repartition/mod.rs | 8 ++++----
datafusion/core/src/physical_plan/sorts/sort.rs | 15 +++++----------
.../core/src/physical_plan/sorts/sort_preserving_merge.rs | 6 +++---
datafusion/core/src/physical_plan/union.rs | 15 +++++++--------
datafusion/core/src/physical_plan/unnest.rs | 6 +++---
10 files changed, 34 insertions(+), 40 deletions(-)
diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs
index d35db5b645..33258f28fa 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -27,7 +27,7 @@ use crate::physical_plan::{
use arrow::array::{ArrayRef, NullArray};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
-use log::debug;
+use log::trace;
use super::expressions::PhysicalSortExpr;
use super::{common, SendableRecordBatchStream, Statistics};
@@ -132,7 +132,7 @@ impl ExecutionPlan for EmptyExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- debug!("Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+ trace!("Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
if partition >= self.partitions {
return Err(DataFusionError::Internal(format!(
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index 93fcfe45da..c46a4a7391 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -29,7 +29,7 @@ use crate::{
},
};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
-use log::debug;
+use log::trace;
use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream};
use crate::execution::context::TaskContext;
@@ -109,7 +109,7 @@ impl ExecutionPlan for ExplainExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- debug!("Start ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+ trace!("Start ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
if 0 != partition {
return Err(DataFusionError::Internal(format!(
"ExplainExec invalid partition {partition}"
@@ -154,7 +154,7 @@ impl ExecutionPlan for ExplainExec {
let tracking_metrics =
MemTrackingMetrics::new(&metrics, context.memory_pool(), partition);
- debug!(
+ trace!(
"Before returning SizedRecordBatch in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
Ok(Box::pin(SizedRecordBatchStream::new(
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index 3786568cf3..67c990a7e3 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -39,7 +39,7 @@ use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::BinaryExpr;
use datafusion_physical_expr::{split_conjunction, AnalysisContext};
-use log::debug;
+use log::trace;
use crate::execution::context::TaskContext;
use futures::stream::{Stream, StreamExt};
@@ -147,7 +147,7 @@ impl ExecutionPlan for FilterExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- debug!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+ trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
Ok(Box::pin(FilterExecStream {
schema: self.input.schema(),
diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs
index bfeb9c65b9..1adec23252 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -19,7 +19,7 @@
use futures::stream::Stream;
use futures::stream::StreamExt;
-use log::debug;
+use log::trace;
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
@@ -136,7 +136,7 @@ impl ExecutionPlan for GlobalLimitExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- debug!(
+ trace!(
"Start GlobalLimitExec::execute for partition: {}",
partition
);
@@ -320,7 +320,7 @@ impl ExecutionPlan for LocalLimitExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- debug!("Start LocalLimitExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+ trace!("Start LocalLimitExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let stream = self.input.execute(partition, context)?;
Ok(Box::pin(LimitStream::new(
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index e70e4faebd..f2775079fc 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -35,7 +35,7 @@ use crate::physical_plan::{
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use futures::stream::{Stream, StreamExt};
-use log::debug;
+use log::trace;
use super::expressions::{Column, PhysicalSortExpr};
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
@@ -251,7 +251,7 @@ impl ExecutionPlan for ProjectionExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- debug!("Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+ trace!("Start ProjectionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
Ok(Box::pin(ProjectionStream {
schema: self.schema.clone(),
expr: self.expr.iter().map(|x| x.0.clone()).collect(),
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs
index 8db230a122..67fc63d235 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -33,7 +33,7 @@ use crate::physical_plan::{
use arrow::array::{ArrayRef, UInt64Builder};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
-use log::debug;
+use log::trace;
use self::distributor_channels::{DistributionReceiver, DistributionSender};
@@ -319,7 +319,7 @@ impl ExecutionPlan for RepartitionExec {
/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but its input(s) are
- /// infinite, returns an error to indicate this.
+ /// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
}
@@ -350,7 +350,7 @@ impl ExecutionPlan for RepartitionExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- debug!(
+ trace!(
"Start RepartitionExec::execute for partition: {}",
partition
);
@@ -411,7 +411,7 @@ impl ExecutionPlan for RepartitionExec {
state.abort_helper = Arc::new(AbortOnDropMany(join_handles))
}
- debug!(
+ trace!(
"Before returning stream in RepartitionExec::execute for partition: {}",
partition
);
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 9461f9689e..12018944d4 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -44,7 +44,7 @@ use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::EquivalenceProperties;
use futures::{StreamExt, TryStreamExt};
-use log::{debug, error};
+use log::{debug, error, trace};
use std::any::Any;
use std::fmt;
use std::fmt::{Debug, Formatter};
@@ -582,16 +582,11 @@ impl ExecutionPlan for SortExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- debug!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
-
- debug!(
- "Start invoking SortExec's input.execute for partition: {}",
- partition
- );
+ trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let input = self.input.execute(partition, context.clone())?;
- debug!("End SortExec's input.execute for partition: {}", partition);
+ trace!("End SortExec's input.execute for partition: {}", partition);
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
@@ -642,7 +637,7 @@ async fn do_sort(
context: Arc<TaskContext>,
fetch: Option<usize>,
) -> Result<SendableRecordBatchStream> {
- debug!(
+ trace!(
"Start do_sort for partition {} of context session_id {} and task_id {:?}",
partition_id,
context.session_id(),
@@ -663,7 +658,7 @@ async fn do_sort(
sorter.insert_batch(batch).await?;
}
let result = sorter.sort();
- debug!(
+ trace!(
"End do_sort for partition {} of context session_id {} and task_id {:?}",
partition_id,
context.session_id(),
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index e96be05f4a..b780583601 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -21,7 +21,7 @@ use std::any::Any;
use std::sync::Arc;
use arrow::datatypes::SchemaRef;
-use log::debug;
+use log::{debug, trace};
use tokio::sync::mpsc;
use crate::error::{DataFusionError, Result};
@@ -149,7 +149,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- debug!(
+ trace!(
"Start SortPreservingMergeExec::execute for partition: {}",
partition
);
@@ -163,7 +163,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
MemTrackingMetrics::new(&self.metrics, context.memory_pool(), partition);
let input_partitions = self.input.output_partitioning().partition_count();
- debug!(
+ trace!(
"Number of input partitions of SortPreservingMergeExec::execute: {}",
input_partitions
);
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index cace842aaf..d1f5ec0c29 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -32,8 +32,7 @@ use arrow::{
use datafusion_common::{DFSchemaRef, DataFusionError};
use futures::{Stream, StreamExt};
use itertools::Itertools;
-use log::debug;
-use log::warn;
+use log::{debug, trace, warn};
use super::{
expressions::PhysicalSortExpr,
@@ -223,7 +222,7 @@ impl ExecutionPlan for UnionExec {
mut partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- debug!("Start UnionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+ trace!("Start UnionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
// record the tiny amount of work done in this function so
// elapsed_compute is reported as non zero
@@ -288,23 +287,23 @@ impl ExecutionPlan for UnionExec {
/// | |---+
/// | Input 1 | |
/// | |-------------+
-/// +---------+ | |
+/// +---------+ | |
/// | | +---------+
/// +------------------>| |
/// +---------------->| Combine |-->
/// | +-------------->| |
/// | | | +---------+
-/// +---------+ | | |
+/// +---------+ | | |
/// | |-----+ | |
/// | Input 2 | | |
/// | |---------------+
-/// +---------+ | | |
+/// +---------+ | | |
/// | | | +---------+
/// | +-------->| |
/// | +------>| Combine |-->
/// | +---->| |
/// | | +---------+
-/// +---------+ | |
+/// +---------+ | |
/// | |-------+ |
/// | Input 3 | |
/// | |-----------------+
@@ -392,7 +391,7 @@ impl ExecutionPlan for InterleaveExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- debug!("Start InterleaveExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+ trace!("Start InterleaveExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
// record the tiny amount of work done in this function so
// elapsed_compute is reported as non zero
diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs
index 9a9408035d..9a84af5523 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -26,7 +26,7 @@ use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use futures::Stream;
use futures::StreamExt;
-use log::debug;
+use log::trace;
use std::time::Instant;
use std::{any::Any, sync::Arc};
@@ -78,7 +78,7 @@ impl ExecutionPlan for UnnestExec {
/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but its input(s) are
- /// infinite, returns an error to indicate this.
+ /// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children[0])
}
@@ -208,7 +208,7 @@ impl UnnestStream {
Some(result)
}
other => {
- debug!(
+ trace!(
"Processed {} probe-side input batches containing {} rows and \
produced {} output batches containing {} rows in {} ms",
self.num_input_batches,