You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/02 15:50:33 UTC

[arrow-datafusion] branch main updated: Minor: Clean up `use`s to point at real crates (#6515)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3466522779 Minor: Clean up `use`s to point at real crates (#6515)
3466522779 is described below

commit 34665227796e0c2caf6421ce390e4aca311e01e3
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri Jun 2 11:50:27 2023 -0400

    Minor: Clean up `use`s to point at real crates (#6515)
    
    * Minor: Clean up includes
    
    * fmt
---
 datafusion/core/src/execution/mod.rs                         |  8 +-------
 .../src/physical_plan/aggregates/bounded_aggregate_stream.rs |  6 +++---
 datafusion/core/src/physical_plan/aggregates/mod.rs          |  7 ++++---
 datafusion/core/src/physical_plan/aggregates/no_grouping.rs  |  4 ++--
 datafusion/core/src/physical_plan/aggregates/row_hash.rs     |  6 +++---
 datafusion/core/src/physical_plan/analyze.rs                 |  2 +-
 datafusion/core/src/physical_plan/coalesce_batches.rs        |  2 +-
 datafusion/core/src/physical_plan/coalesce_partitions.rs     |  2 +-
 datafusion/core/src/physical_plan/common.rs                  |  2 +-
 datafusion/core/src/physical_plan/empty.rs                   |  2 +-
 datafusion/core/src/physical_plan/explain.rs                 |  2 +-
 datafusion/core/src/physical_plan/file_format/avro.rs        |  2 +-
 datafusion/core/src/physical_plan/file_format/csv.rs         |  2 +-
 datafusion/core/src/physical_plan/file_format/json.rs        |  2 +-
 datafusion/core/src/physical_plan/file_format/mod.rs         |  2 +-
 datafusion/core/src/physical_plan/file_format/parquet.rs     |  2 +-
 datafusion/core/src/physical_plan/filter.rs                  |  2 +-
 datafusion/core/src/physical_plan/insert.rs                  |  2 +-
 datafusion/core/src/physical_plan/joins/cross_join.rs        |  6 +++---
 datafusion/core/src/physical_plan/joins/nested_loop_join.rs  |  4 ++--
 datafusion/core/src/physical_plan/joins/sort_merge_join.rs   |  6 +++---
 .../core/src/physical_plan/joins/symmetric_hash_join.rs      |  2 +-
 datafusion/core/src/physical_plan/limit.rs                   |  2 +-
 datafusion/core/src/physical_plan/memory.rs                  |  2 +-
 datafusion/core/src/physical_plan/mod.rs                     |  2 +-
 datafusion/core/src/physical_plan/planner.rs                 |  6 +++---
 datafusion/core/src/physical_plan/projection.rs              |  2 +-
 datafusion/core/src/physical_plan/repartition/mod.rs         |  4 ++--
 datafusion/core/src/physical_plan/sorts/sort.rs              | 12 ++++++------
 .../core/src/physical_plan/sorts/sort_preserving_merge.rs    |  2 +-
 datafusion/core/src/physical_plan/streaming.rs               |  2 +-
 datafusion/core/src/physical_plan/union.rs                   |  2 +-
 datafusion/core/src/physical_plan/unnest.rs                  |  2 +-
 datafusion/core/src/physical_plan/values.rs                  |  2 +-
 .../src/physical_plan/windows/bounded_window_agg_exec.rs     |  2 +-
 datafusion/core/src/physical_plan/windows/window_agg_exec.rs |  4 ++--
 datafusion/execution/src/lib.rs                              |  2 ++
 37 files changed, 60 insertions(+), 63 deletions(-)

diff --git a/datafusion/core/src/execution/mod.rs b/datafusion/core/src/execution/mod.rs
index fa6c4e118e..7e757fabac 100644
--- a/datafusion/core/src/execution/mod.rs
+++ b/datafusion/core/src/execution/mod.rs
@@ -22,10 +22,4 @@ pub mod context;
 pub use crate::datasource::file_format::options;
 
 // backwards compatibility
-pub use datafusion_execution::disk_manager;
-pub use datafusion_execution::memory_pool;
-pub use datafusion_execution::registry;
-pub use datafusion_execution::runtime_env;
-
-pub use disk_manager::DiskManager;
-pub use registry::FunctionRegistry;
+pub use datafusion_execution::*;
diff --git a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
index bfdb6828ff..1eed235850 100644
--- a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
+++ b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs
@@ -31,9 +31,6 @@ use futures::stream::{Stream, StreamExt};
 use hashbrown::raw::RawTable;
 use itertools::izip;
 
-use crate::execution::context::TaskContext;
-use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
-use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use crate::physical_plan::aggregates::{
     evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
     AggregationOrdering, GroupByOrderMode, PhysicalGroupBy, RowAccumulatorItem,
@@ -41,6 +38,9 @@ use crate::physical_plan::aggregates::{
 use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
 use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
 use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
+use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion_execution::TaskContext;
 
 use crate::physical_plan::aggregates::utils::{
     aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
index d25f887b7f..9eba869290 100644
--- a/datafusion/core/src/physical_plan/aggregates/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -17,7 +17,6 @@
 
 //! Aggregates functionalities
 
-use crate::execution::context::TaskContext;
 use crate::physical_plan::aggregates::{
     bounded_aggregate_stream::BoundedAggregateStream, no_grouping::AggregateStream,
     row_hash::GroupedHashAggregateStream,
@@ -32,6 +31,7 @@ use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use datafusion_common::utils::longest_consecutive_prefix;
 use datafusion_common::{DataFusionError, Result};
+use datafusion_execution::TaskContext;
 use datafusion_expr::Accumulator;
 use datafusion_physical_expr::{
     aggregate::row_accumulator::RowAccumulator,
@@ -1047,8 +1047,8 @@ fn evaluate_group_by(
 
 #[cfg(test)]
 mod tests {
-    use crate::execution::context::{SessionConfig, TaskContext};
-    use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+    use super::*;
+    use crate::execution::context::SessionConfig;
     use crate::from_slice::FromSlice;
     use crate::physical_plan::aggregates::{
         get_finest_requirement, get_working_mode, AggregateExec, AggregateMode,
@@ -1063,6 +1063,7 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use arrow::record_batch::RecordBatch;
     use datafusion_common::{DataFusionError, Result, ScalarValue};
+    use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use datafusion_physical_expr::expressions::{
         lit, ApproxDistinct, Column, Count, Median,
     };
diff --git a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
index 4c289eee9b..89d392f0b6 100644
--- a/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
+++ b/datafusion/core/src/physical_plan/aggregates/no_grouping.rs
@@ -17,7 +17,6 @@
 
 //! Aggregate without grouping columns
 
-use crate::execution::context::TaskContext;
 use crate::physical_plan::aggregates::{
     aggregate_expressions, create_accumulators, finalize_aggregation, AccumulatorItem,
     AggregateMode,
@@ -27,14 +26,15 @@ use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use datafusion_common::Result;
+use datafusion_execution::TaskContext;
 use datafusion_physical_expr::PhysicalExpr;
 use futures::stream::BoxStream;
 use std::borrow::Cow;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use crate::physical_plan::filter::batch_filter;
+use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use futures::stream::{Stream, StreamExt};
 
 use super::AggregateExec;
diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
index d39bbd80d0..a1e0481757 100644
--- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs
+++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs
@@ -29,9 +29,6 @@ use datafusion_physical_expr::hash_utils::create_hashes;
 use futures::ready;
 use futures::stream::{Stream, StreamExt};
 
-use crate::execution::context::TaskContext;
-use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
-use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use crate::physical_plan::aggregates::utils::{
     aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
     read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
@@ -49,6 +46,9 @@ use arrow::datatypes::DataType;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
 use datafusion_common::cast::as_boolean_array;
 use datafusion_common::{Result, ScalarValue};
+use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
+use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion_execution::TaskContext;
 use datafusion_expr::Accumulator;
 use datafusion_row::accessor::RowAccessor;
 use datafusion_row::layout::RowLayout;
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index 84d74c512b..39d715761f 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -34,7 +34,7 @@ use tokio::task::JoinSet;
 use super::expressions::PhysicalSortExpr;
 use super::stream::RecordBatchStreamAdapter;
 use super::{Distribution, SendableRecordBatchStream};
-use crate::execution::context::TaskContext;
+use datafusion_execution::TaskContext;
 
 /// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
 /// discards the results, and then prints out an annotated plan with metrics
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 82f37ea396..09ef972cb2 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -29,10 +29,10 @@ use crate::physical_plan::{
     RecordBatchStream, SendableRecordBatchStream,
 };
 
-use crate::execution::context::TaskContext;
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
+use datafusion_execution::TaskContext;
 use futures::stream::{Stream, StreamExt};
 use log::trace;
 
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index fe667d1e6e..11d7021ca9 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -38,8 +38,8 @@ use crate::physical_plan::{
 };
 
 use super::SendableRecordBatchStream;
-use crate::execution::context::TaskContext;
 use crate::physical_plan::common::spawn_execution;
+use datafusion_execution::TaskContext;
 
 /// Merge execution plan executes partitions in parallel and combines them into a single
 /// partition. No guarantees are made about the order of the resulting partition.
diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index e766c225b5..98239557cb 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -19,13 +19,13 @@
 
 use super::SendableRecordBatchStream;
 use crate::error::{DataFusionError, Result};
-use crate::execution::context::TaskContext;
 use crate::execution::memory_pool::MemoryReservation;
 use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
 use arrow::datatypes::Schema;
 use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
 use arrow::record_batch::RecordBatch;
+use datafusion_execution::TaskContext;
 use datafusion_physical_expr::expressions::{BinaryExpr, Column};
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
 use futures::{Future, StreamExt, TryStreamExt};
diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs
index 33258f28fa..c0c500053a 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -32,7 +32,7 @@ use log::trace;
 use super::expressions::PhysicalSortExpr;
 use super::{common, SendableRecordBatchStream, Statistics};
 
-use crate::execution::context::TaskContext;
+use datafusion_execution::TaskContext;
 
 /// Execution plan for empty relation (produces no rows)
 #[derive(Debug)]
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index 6eb72e4ff3..1b9e7809d5 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -29,8 +29,8 @@ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatc
 use log::trace;
 
 use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream};
-use crate::execution::context::TaskContext;
 use crate::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion_execution::TaskContext;
 
 /// Explain execution plan operator. This operator contains the string
 /// values of the various plans it has when it is created, and passes
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 5b407d5498..9adf492d7d 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -17,13 +17,13 @@
 
 //! Execution plan for reading line-delimited Avro files
 use crate::error::Result;
-use crate::execution::context::TaskContext;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
     ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
     Partitioning, SendableRecordBatchStream, Statistics,
 };
+use datafusion_execution::TaskContext;
 
 use arrow::datatypes::SchemaRef;
 use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 1dde376315..2a68d98861 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -19,7 +19,6 @@
 
 use crate::datasource::file_format::file_type::FileCompressionType;
 use crate::error::{DataFusionError, Result};
-use crate::execution::context::TaskContext;
 use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::file_format::file_stream::{
@@ -33,6 +32,7 @@ use crate::physical_plan::{
 };
 use arrow::csv;
 use arrow::datatypes::SchemaRef;
+use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
 
 use bytes::{Buf, Bytes};
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index dcf23fdb25..10f249d4e7 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -18,7 +18,6 @@
 //! Execution plan for reading line-delimited JSON files
 use crate::datasource::file_format::file_type::FileCompressionType;
 use crate::error::{DataFusionError, Result};
-use crate::execution::context::TaskContext;
 use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::file_format::file_stream::{
@@ -30,6 +29,7 @@ use crate::physical_plan::{
     ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
     Partitioning, SendableRecordBatchStream, Statistics,
 };
+use datafusion_execution::TaskContext;
 
 use arrow::json::ReaderBuilder;
 use arrow::{datatypes::SchemaRef, json};
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs
index f96806544f..3b737f03d8 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -129,7 +129,7 @@ pub struct FileScanConfig {
     /// [`RuntimeEnv::object_store`]
     ///
     /// [`ObjectStore`]: object_store::ObjectStore
-    /// [`RuntimeEnv::object_store`]: crate::execution::runtime_env::RuntimeEnv::object_store
+    /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
     pub object_store_url: ObjectStoreUrl,
     /// Schema before `projection` is applied. It contains the all columns that may
     /// appear in the files. It does not include table partition columns
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 3b622423f2..800da3a177 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -781,12 +781,12 @@ mod tests {
     // See also `parquet_exec` integration test
 
     use super::*;
+    use crate::datasource::file_format::options::CsvReadOptions;
     use crate::datasource::file_format::parquet::test_util::store_parquet;
     use crate::datasource::file_format::test_util::scan_format;
     use crate::datasource::listing::{FileRange, PartitionedFile};
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::execution::context::SessionState;
-    use crate::execution::options::CsvReadOptions;
     use crate::physical_plan::displayable;
     use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
     use crate::test::object_store::local_unpartitioned_file;
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index 74f6caa76b..e615ece452 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -41,7 +41,7 @@ use datafusion_physical_expr::{split_conjunction, AnalysisContext};
 
 use log::trace;
 
-use crate::execution::context::TaskContext;
+use datafusion_execution::TaskContext;
 use futures::stream::{Stream, StreamExt};
 
 /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs
index 904348c574..e1252fbab7 100644
--- a/datafusion/core/src/physical_plan/insert.rs
+++ b/datafusion/core/src/physical_plan/insert.rs
@@ -34,10 +34,10 @@ use std::any::Any;
 use std::fmt::{Debug, Display};
 use std::sync::Arc;
 
-use crate::execution::context::TaskContext;
 use crate::physical_plan::stream::RecordBatchStreamAdapter;
 use crate::physical_plan::Distribution;
 use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
 
 /// `DataSink` implements writing streams of [`RecordBatch`]es to
 /// user defined destinations.
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs
index 39d0804c05..c445bfba2f 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -25,7 +25,6 @@ use std::{any::Any, sync::Arc, task::Poll};
 use arrow::datatypes::{Fields, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 
-use crate::execution::context::TaskContext;
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
     coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec,
@@ -37,6 +36,7 @@ use crate::{error::Result, scalar::ScalarValue};
 use async_trait::async_trait;
 use datafusion_common::DataFusionError;
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion_execution::TaskContext;
 
 use super::utils::{
     adjust_right_output_partitioning, cross_join_equivalence_properties,
@@ -158,7 +158,7 @@ impl ExecutionPlan for CrossJoinExec {
 
     /// Specifies whether this plan generates an infinite stream of records.
     /// If the plan does not support pipelining, but its input(s) are
-    /// infinite, returns an error to indicate this.    
+    /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         if children[0] || children[1] {
             Err(DataFusionError::Plan(
@@ -457,10 +457,10 @@ mod tests {
     use super::*;
     use crate::assert_batches_sorted_eq;
     use crate::common::assert_contains;
-    use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use crate::physical_plan::common;
     use crate::prelude::{SessionConfig, SessionContext};
     use crate::test::{build_table_scan_i32, columns};
+    use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 
     async fn join_collect(
         left: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
index 17d1eddf55..679ae5b3c5 100644
--- a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
+++ b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs
@@ -49,9 +49,9 @@ use std::sync::Arc;
 use std::task::Poll;
 
 use crate::error::Result;
-use crate::execution::context::TaskContext;
-use crate::execution::memory_pool::MemoryConsumer;
 use crate::physical_plan::coalesce_batches::concat_batches;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
 
 /// Data of the inner table side
 type JoinLeftData = (RecordBatch, MemoryReservation);
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 93b3a68622..85bd18e592 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -40,8 +40,6 @@ use futures::{Stream, StreamExt};
 
 use crate::error::DataFusionError;
 use crate::error::Result;
-use crate::execution::context::TaskContext;
-use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use crate::logical_expr::JoinType;
 use crate::physical_plan::expressions::Column;
 use crate::physical_plan::expressions::PhysicalSortExpr;
@@ -54,6 +52,8 @@ use crate::physical_plan::{
     metrics, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan,
     Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
+use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion_execution::TaskContext;
 
 use datafusion_common::tree_node::{Transformed, TreeNode};
 
@@ -1396,7 +1396,6 @@ mod tests {
 
     use crate::common::assert_contains;
     use crate::error::Result;
-    use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use crate::logical_expr::JoinType;
     use crate::physical_plan::expressions::Column;
     use crate::physical_plan::joins::utils::JoinOn;
@@ -1406,6 +1405,7 @@ mod tests {
     use crate::prelude::{SessionConfig, SessionContext};
     use crate::test::{build_table_i32, columns};
     use crate::{assert_batches_eq, assert_batches_sorted_eq};
+    use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 
     fn build_table(
         a: (&str, &Vec<i32>),
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 3bd177e03b..b5c5b06c3f 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -51,7 +51,6 @@ use datafusion_execution::memory_pool::MemoryConsumer;
 use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalBound};
 
 use crate::error::{DataFusionError, Result};
-use crate::execution::context::TaskContext;
 use crate::logical_expr::JoinType;
 use crate::physical_plan::common::SharedMemoryReservation;
 use crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema;
@@ -72,6 +71,7 @@ use crate::physical_plan::{
     DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
     RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
+use datafusion_execution::TaskContext;
 
 const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
 
diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs
index 1adec23252..ff13f92635 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -39,7 +39,7 @@ use super::{
     RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
 
-use crate::execution::context::TaskContext;
+use datafusion_execution::TaskContext;
 
 /// Limit execution plan
 #[derive(Debug)]
diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs
index 79d6f6c1cc..02bab7b82a 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -30,8 +30,8 @@ use std::any::Any;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use crate::execution::context::TaskContext;
 use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
 use futures::Stream;
 
 /// Execution plan for reading in-memory batches of data
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 2c3a3e9b49..155d79e7e8 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -716,10 +716,10 @@ pub mod unnest;
 pub mod values;
 pub mod windows;
 
-use crate::execution::context::TaskContext;
 use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::repartition::RepartitionExec;
 use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use datafusion_execution::TaskContext;
 pub use datafusion_physical_expr::{
     expressions, functions, hash_utils, type_coercion, udf,
 };
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 4527440906..6f45b7b545 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1927,10 +1927,8 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::datasource::file_format::options::CsvReadOptions;
     use crate::datasource::MemTable;
-    use crate::execution::context::TaskContext;
-    use crate::execution::options::CsvReadOptions;
-    use crate::execution::runtime_env::RuntimeEnv;
     use crate::physical_plan::SendableRecordBatchStream;
     use crate::physical_plan::{
         expressions, DisplayFormatType, Partitioning, PhysicalPlanner, Statistics,
@@ -1943,6 +1941,8 @@ mod tests {
     use arrow::record_batch::RecordBatch;
     use datafusion_common::{assert_contains, TableReference};
     use datafusion_common::{DFField, DFSchema, DFSchemaRef};
+    use datafusion_execution::runtime_env::RuntimeEnv;
+    use datafusion_execution::TaskContext;
     use datafusion_expr::{
         col, lit, sum, Extension, GroupingSet, LogicalPlanBuilder,
         UserDefinedLogicalNodeCore,
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index f2775079fc..40096a5415 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -27,13 +27,13 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use crate::error::Result;
-use crate::execution::context::TaskContext;
 use crate::physical_plan::{
     ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan,
     Partitioning, PhysicalExpr,
 };
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::record_batch::{RecordBatch, RecordBatchOptions};
+use datafusion_execution::TaskContext;
 use futures::stream::{Stream, StreamExt};
 use log::trace;
 
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs
index 32605cd977..369063dca4 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -42,7 +42,7 @@ use super::expressions::PhysicalSortExpr;
 use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
 use super::{RecordBatchStream, SendableRecordBatchStream};
 
-use crate::execution::context::TaskContext;
+use datafusion_execution::TaskContext;
 use datafusion_physical_expr::PhysicalExpr;
 use futures::stream::Stream;
 use futures::{FutureExt, StreamExt};
@@ -685,7 +685,6 @@ impl RecordBatchStream for RepartitionStream {
 mod tests {
     use super::*;
     use crate::execution::context::SessionConfig;
-    use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use crate::from_slice::FromSlice;
     use crate::prelude::SessionContext;
     use crate::test::create_vec_batches;
@@ -704,6 +703,7 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow::record_batch::RecordBatch;
     use datafusion_common::cast::as_string_array;
+    use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use futures::FutureExt;
     use std::collections::HashSet;
 
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index 35dac19b27..3e3a79495b 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -20,11 +20,6 @@
 //! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
-use crate::execution::context::TaskContext;
-use crate::execution::memory_pool::{
-    human_readable_size, MemoryConsumer, MemoryReservation,
-};
-use crate::execution::runtime_env::RuntimeEnv;
 use crate::physical_plan::common::{batch_byte_size, spawn_buffered, IPCWriter};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
@@ -41,6 +36,11 @@ use arrow::compute::{concat_batches, lexsort_to_indices, take};
 use arrow::datatypes::SchemaRef;
 use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
+use datafusion_execution::memory_pool::{
+    human_readable_size, MemoryConsumer, MemoryReservation,
+};
+use datafusion_execution::runtime_env::RuntimeEnv;
+use datafusion_execution::TaskContext;
 use datafusion_physical_expr::EquivalenceProperties;
 use futures::{StreamExt, TryStreamExt};
 use log::{debug, error, trace};
@@ -648,7 +648,6 @@ impl ExecutionPlan for SortExec {
 mod tests {
     use super::*;
     use crate::execution::context::SessionConfig;
-    use crate::execution::runtime_env::RuntimeConfig;
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::collect;
     use crate::physical_plan::expressions::col;
@@ -661,6 +660,7 @@ mod tests {
     use arrow::compute::SortOptions;
     use arrow::datatypes::*;
     use datafusion_common::cast::{as_primitive_array, as_string_array};
+    use datafusion_execution::runtime_env::RuntimeConfig;
     use futures::FutureExt;
     use std::collections::HashMap;
 
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index e346eccbee..95cc23a20c 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -24,7 +24,6 @@ use arrow::datatypes::SchemaRef;
 use log::{debug, trace};
 
 use crate::error::{DataFusionError, Result};
-use crate::execution::context::TaskContext;
 use crate::physical_plan::common::spawn_buffered;
 use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
@@ -34,6 +33,7 @@ use crate::physical_plan::{
     expressions::PhysicalSortExpr, DisplayFormatType, Distribution, ExecutionPlan,
     Partitioning, SendableRecordBatchStream, Statistics,
 };
+use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};
 
 /// Sort preserving merge execution plan
diff --git a/datafusion/core/src/physical_plan/streaming.rs b/datafusion/core/src/physical_plan/streaming.rs
index ff5d88dd31..0555c1ce28 100644
--- a/datafusion/core/src/physical_plan/streaming.rs
+++ b/datafusion/core/src/physical_plan/streaming.rs
@@ -28,9 +28,9 @@ use datafusion_common::{DataFusionError, Result, Statistics};
 use datafusion_physical_expr::PhysicalSortExpr;
 
 use crate::datasource::streaming::PartitionStream;
-use crate::execution::context::TaskContext;
 use crate::physical_plan::stream::RecordBatchStreamAdapter;
 use crate::physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
+use datafusion_execution::TaskContext;
 
 /// An [`ExecutionPlan`] for [`PartitionStream`]
 pub struct StreamingTableExec {
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index d1f5ec0c29..5cf25fbe02 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -40,12 +40,12 @@ use super::{
     ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
     SendableRecordBatchStream, Statistics,
 };
-use crate::execution::context::TaskContext;
 use crate::physical_plan::common::get_meet_of_orderings;
 use crate::{
     error::Result,
     physical_plan::{expressions, metrics::BaselineMetrics},
 };
+use datafusion_execution::TaskContext;
 use tokio::macros::support::thread_rng_n;
 
 /// `UnionExec`: `UNION ALL` execution plan.
diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs
index 9a84af5523..3759e06b76 100644
--- a/datafusion/core/src/physical_plan/unnest.rs
+++ b/datafusion/core/src/physical_plan/unnest.rs
@@ -24,13 +24,13 @@ use arrow::array::{
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
+use datafusion_execution::TaskContext;
 use futures::Stream;
 use futures::StreamExt;
 use log::trace;
 use std::time::Instant;
 use std::{any::Any, sync::Arc};
 
-use crate::execution::context::TaskContext;
 use crate::physical_plan::{
     coalesce_batches::concat_batches, expressions::Column, DisplayFormatType,
     Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, PhysicalExpr,
diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs
index 2ac2dd1ae0..cbc6dbf6b7 100644
--- a/datafusion/core/src/physical_plan/values.rs
+++ b/datafusion/core/src/physical_plan/values.rs
@@ -20,7 +20,6 @@
 use super::expressions::PhysicalSortExpr;
 use super::{common, SendableRecordBatchStream, Statistics};
 use crate::error::{DataFusionError, Result};
-use crate::execution::context::TaskContext;
 use crate::physical_plan::{
     memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning,
     PhysicalExpr,
@@ -29,6 +28,7 @@ use crate::scalar::ScalarValue;
 use arrow::array::new_null_array;
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
+use datafusion_execution::TaskContext;
 use std::any::Any;
 use std::sync::Arc;
 
diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index 95b482ef24..09d3a7cbb3 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -21,7 +21,6 @@
 //! infinite inputs.
 
 use crate::error::Result;
-use crate::execution::context::TaskContext;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
@@ -33,6 +32,7 @@ use crate::physical_plan::{
     ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
     RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
 };
+use datafusion_execution::TaskContext;
 
 use ahash::RandomState;
 use arrow::{
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index dc0302d77b..50aaf8b2fc 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -18,7 +18,6 @@
 //! Stream and channel implementations for window function expressions.
 
 use crate::error::Result;
-use crate::execution::context::TaskContext;
 use crate::physical_plan::common::transpose;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
@@ -42,6 +41,7 @@ use arrow::{
 };
 use datafusion_common::utils::{evaluate_partition_ranges, get_at_indices};
 use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{OrderingEquivalenceProperties, PhysicalSortRequirement};
 use futures::stream::Stream;
 use futures::{ready, StreamExt};
@@ -145,7 +145,7 @@ impl ExecutionPlan for WindowAggExec {
 
     /// Specifies whether this plan generates an infinite stream of records.
     /// If the plan does not support pipelining, but its input(s) are
-    /// infinite, returns an error to indicate this.    
+    /// infinite, returns an error to indicate this.
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
         if children[0] {
             Err(DataFusionError::Plan(
diff --git a/datafusion/execution/src/lib.rs b/datafusion/execution/src/lib.rs
index 07357e0579..46ffe12942 100644
--- a/datafusion/execution/src/lib.rs
+++ b/datafusion/execution/src/lib.rs
@@ -25,4 +25,6 @@ pub mod registry;
 pub mod runtime_env;
 mod task;
 
+pub use disk_manager::DiskManager;
+pub use registry::FunctionRegistry;
 pub use task::TaskContext;