You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/27 21:45:47 UTC

[arrow-datafusion] branch master updated: Set query_execution_start_time on snapshot from SessionContext (#4747) (#4750)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 981a9bbc2 Set query_execution_start_time on snapshot from SessionContext (#4747) (#4750)
981a9bbc2 is described below

commit 981a9bbc229288da5293a4a5f0478bdf5ac358df
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Dec 27 21:45:42 2022 +0000

    Set query_execution_start_time on snapshot from SessionContext (#4747) (#4750)
    
    * Set query_execution_start_time on snapshot from SessionContext (#4747)
    
    * Remove TODO
    
    * Clippy
---
 datafusion/core/src/dataframe.rs                | 20 +++++++-------------
 datafusion/core/src/datasource/view.rs          |  5 +----
 datafusion/core/src/execution/context.rs        | 25 ++++++-------------------
 datafusion/physical-expr/src/execution_props.rs |  8 +++++---
 4 files changed, 19 insertions(+), 39 deletions(-)

diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 60d79490c..0d6bae09a 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -87,13 +87,7 @@ impl DataFrame {
     }
 
     /// Create a physical plan
-    pub async fn create_physical_plan(mut self) -> Result<Arc<dyn ExecutionPlan>> {
-        self.create_physical_plan_impl().await
-    }
-
-    /// Temporary pending #4626
-    async fn create_physical_plan_impl(&mut self) -> Result<Arc<dyn ExecutionPlan>> {
-        self.session_state.execution_props.start_execution();
+    pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>> {
         self.session_state.create_physical_plan(&self.plan).await
     }
 
@@ -627,24 +621,24 @@ impl DataFrame {
     }
 
     /// Write a `DataFrame` to a CSV file.
-    pub async fn write_csv(mut self, path: &str) -> Result<()> {
-        let plan = self.create_physical_plan_impl().await?;
+    pub async fn write_csv(self, path: &str) -> Result<()> {
+        let plan = self.session_state.create_physical_plan(&self.plan).await?;
         plan_to_csv(&self.session_state, plan, path).await
     }
 
     /// Write a `DataFrame` to a Parquet file.
     pub async fn write_parquet(
-        mut self,
+        self,
         path: &str,
         writer_properties: Option<WriterProperties>,
     ) -> Result<()> {
-        let plan = self.create_physical_plan_impl().await?;
+        let plan = self.session_state.create_physical_plan(&self.plan).await?;
         plan_to_parquet(&self.session_state, plan, path, writer_properties).await
     }
 
     /// Executes a query and writes the results to a partitioned JSON file.
-    pub async fn write_json(mut self, path: impl AsRef<str>) -> Result<()> {
-        let plan = self.create_physical_plan_impl().await?;
+    pub async fn write_json(self, path: impl AsRef<str>) -> Result<()> {
+        let plan = self.session_state.create_physical_plan(&self.plan).await?;
         plan_to_json(&self.session_state, plan, path).await
     }
 
diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index 271051096..4fae03fad 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -108,9 +108,6 @@ impl TableProvider for ViewTable {
         filters: &[Expr],
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        // clone state and start_execution so that now() works in views
-        let mut state_cloned = state.clone();
-        state_cloned.execution_props.start_execution();
         let plan = if let Some(projection) = projection {
             // avoiding adding a redundant projection (e.g. SELECT * FROM view)
             let current_projection =
@@ -144,7 +141,7 @@ impl TableProvider for ViewTable {
             plan = plan.limit(0, Some(limit))?;
         }
 
-        state_cloned.create_physical_plan(&plan.build()?).await
+        state.create_physical_plan(&plan.build()?).await
     }
 }
 
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index fcfcf1ca6..9c909d5d6 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -1000,23 +1000,7 @@ impl SessionContext {
         &self,
         logical_plan: &LogicalPlan,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let state_cloned = {
-            let mut state = self.state.write();
-            state.execution_props.start_execution();
-
-            // We need to clone `state` to release the lock that is not `Send`. We could
-            // make the lock `Send` by using `tokio::sync::Mutex`, but that would require to
-            // propagate async even to the `LogicalPlan` building methods.
-            // Cloning `state` here is fine as we then pass it as immutable `&state`, which
-            // means that we avoid write consistency issues as the cloned version will not
-            // be written to. As for eventual modifications that would be applied to the
-            // original state after it has been cloned, they will not be picked up by the
-            // clone but that is okay, as it is equivalent to postponing the state update
-            // by keeping the lock until the end of the function scope.
-            state.clone()
-        };
-
-        state_cloned.create_physical_plan(logical_plan).await
+        self.state().create_physical_plan(logical_plan).await
     }
 
     /// Executes a query and writes the results to a partitioned CSV file.
@@ -1055,9 +1039,12 @@ impl SessionContext {
         Arc::new(TaskContext::from(self))
     }
 
-    /// Get a copy of the [`SessionState`] of this [`SessionContext`]
+    /// Snapshots the [`SessionState`] of this [`SessionContext`] setting the
+    /// `query_execution_start_time` to the current time
     pub fn state(&self) -> SessionState {
-        self.state.read().clone()
+        let mut state = self.state.read().clone();
+        state.execution_props.start_execution();
+        state
     }
 }
 
diff --git a/datafusion/physical-expr/src/execution_props.rs b/datafusion/physical-expr/src/execution_props.rs
index 2e6820666..800e35583 100644
--- a/datafusion/physical-expr/src/execution_props.rs
+++ b/datafusion/physical-expr/src/execution_props.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::var_provider::{VarProvider, VarType};
-use chrono::{DateTime, Utc};
+use chrono::{DateTime, TimeZone, Utc};
 use std::collections::HashMap;
 use std::sync::Arc;
 
@@ -44,14 +44,16 @@ impl ExecutionProps {
     /// Creates a new execution props
     pub fn new() -> Self {
         ExecutionProps {
-            query_execution_start_time: chrono::Utc::now(),
+            // Set this to a fixed sentinel to make it obvious if this is
+            // not being updated / propagated correctly
+            query_execution_start_time: Utc.timestamp_nanos(0),
             var_providers: None,
         }
     }
 
     /// Marks the execution of query started timestamp
     pub fn start_execution(&mut self) -> &Self {
-        self.query_execution_start_time = chrono::Utc::now();
+        self.query_execution_start_time = Utc::now();
         &*self
     }