You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/08/23 07:56:54 UTC

[arrow-datafusion] branch master updated: fix issue with now() returning same value across statements (#3210)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new eedc7874c fix issue with now() returning same value across statements (#3210)
eedc7874c is described below

commit eedc7874c4ed371d4df6f4b36690f78da82e2524
Author: kmitchener <km...@gmail.com>
AuthorDate: Tue Aug 23 03:56:48 2022 -0400

    fix issue with now() returning same value across statements (#3210)
---
 datafusion/core/src/dataframe.rs                   |  21 ++++-
 datafusion/core/src/datasource/view.rs             |   7 +-
 datafusion/core/src/execution/context.rs           |  10 +--
 datafusion/core/src/logical_plan/mod.rs            |   8 +-
 datafusion/core/src/physical_plan/mod.rs           |   8 +-
 datafusion/core/tests/sql/timestamp.rs             | 100 +++++++++++++++++----
 datafusion/expr/src/expr_fn.rs                     |  12 ++-
 .../physical-expr/src/datetime_expressions.rs      |   4 +-
 datafusion/proto/src/from_proto.rs                 |   9 +-
 docs/source/user-guide/sql/scalar_functions.md     |   5 +-
 10 files changed, 133 insertions(+), 51 deletions(-)

diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 66a5671f3..bb572b7d5 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -84,8 +84,25 @@ impl DataFrame {
 
     /// Create a physical plan
     pub async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
-        let state = self.session_state.read().clone();
-        state.create_physical_plan(&self.plan).await
+        // this function is copied from SessionContext function of the
+        // same name
+        let state_cloned = {
+            let mut state = self.session_state.write();
+            state.execution_props.start_execution();
+
+            // We need to clone `state` to release the lock that is not `Send`. We could
+            // make the lock `Send` by using `tokio::sync::Mutex`, but that would require to
+            // propagate async even to the `LogicalPlan` building methods.
+            // Cloning `state` here is fine as we then pass it as immutable `&state`, which
+            // means that we avoid write consistency issues as the cloned version will not
+            // be written to. As for eventual modifications that would be applied to the
+            // original state after it has been cloned, they will not be picked up by the
+            // clone but that is okay, as it is equivalent to postponing the state update
+            // by keeping the lock until the end of the function scope.
+            state.clone()
+        };
+
+        state_cloned.create_physical_plan(&self.plan).await
     }
 
     /// Filter the DataFrame by column. Returns a new DataFrame only containing the
diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index d7ec155d1..c93d83f27 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -80,12 +80,15 @@ impl TableProvider for ViewTable {
 
     async fn scan(
         &self,
-        ctx: &SessionState,
+        state: &SessionState,
         _projection: &Option<Vec<usize>>,
         _filters: &[Expr],
         _limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        ctx.create_physical_plan(&self.logical_plan).await
+        // clone state and start_execution so that now() works in views
+        let mut state_cloned = state.clone();
+        state_cloned.execution_props.start_execution();
+        state_cloned.create_physical_plan(&self.logical_plan).await
     }
 }
 
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 64403715c..9c9ed9526 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -365,20 +365,18 @@ impl SessionContext {
                 match (or_replace, view) {
                     (true, Ok(_)) => {
                         self.deregister_table(name.as_str())?;
-                        let plan = self.optimize(&input)?;
                         let table =
-                            Arc::new(ViewTable::try_new(plan.clone(), definition)?);
+                            Arc::new(ViewTable::try_new((*input).clone(), definition)?);
 
                         self.register_table(name.as_str(), table)?;
-                        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+                        Ok(Arc::new(DataFrame::new(self.state.clone(), &input)))
                     }
                     (_, Err(_)) => {
-                        let plan = self.optimize(&input)?;
                         let table =
-                            Arc::new(ViewTable::try_new(plan.clone(), definition)?);
+                            Arc::new(ViewTable::try_new((*input).clone(), definition)?);
 
                         self.register_table(name.as_str(), table)?;
-                        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+                        Ok(Arc::new(DataFrame::new(self.state.clone(), &input)))
                     }
                     (false, Ok(_)) => Err(DataFusionError::Execution(format!(
                         "Table '{:?}' already exists",
diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index 87a02ae01..a79e9f5e0 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -50,10 +50,10 @@ pub use datafusion_expr::{
         StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
         UserDefinedLogicalNode, Values,
     },
-    lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, now_expr,
-    nullif, octet_length, or, power, random, regexp_match, regexp_replace, repeat,
-    replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384,
-    sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
+    lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, nullif,
+    octet_length, or, power, random, regexp_match, regexp_replace, repeat, replace,
+    reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384, sha512,
+    signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
     to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
     trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator,
 };
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 5e5f38ffc..4613be7cd 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -120,12 +120,12 @@ pub struct ColumnStatistics {
 
 /// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
 ///
-/// Each `ExecutionPlan` is Partition-aware and is responsible for
+/// Each `ExecutionPlan` is partition-aware and is responsible for
 /// creating the actual `async` [`SendableRecordBatchStream`]s
 /// of [`RecordBatch`] that incrementally compute the operator's
 /// output from its input partition.
 ///
-/// [`ExecutionPlan`] can be displayed in an simplified form using the
+/// [`ExecutionPlan`] can be displayed in a simplified form using the
 /// return value from [`displayable`] in addition to the (normally
 /// quite verbose) `Debug` output.
 pub trait ExecutionPlan: Debug + Send + Sync {
@@ -168,7 +168,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// The default implementation returns `true`
     ///
     /// WARNING: if you override this default and return `false`, your
-    /// operator can not rely on datafusion preserving the input order
+    /// operator can not rely on DataFusion preserving the input order
     /// as it will likely not.
     fn relies_on_input_order(&self) -> bool {
         true
@@ -200,7 +200,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// parallelism may outweigh any benefits
     ///
     /// The default implementation returns `true` unless this operator
-    /// has signalled it requiers a single child input partition.
+    /// has signalled it requires a single child input partition.
     fn benefits_from_input_partitioning(&self) -> bool {
         // By default try to maximize parallelism with more CPUs if
         // possible
diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs
index 9f9004d6b..57900bd29 100644
--- a/datafusion/core/tests/sql/timestamp.rs
+++ b/datafusion/core/tests/sql/timestamp.rs
@@ -436,31 +436,93 @@ async fn test_current_timestamp_expressions() -> Result<()> {
 }
 
 #[tokio::test]
-async fn test_current_timestamp_expressions_non_optimized() -> Result<()> {
-    let t1 = chrono::Utc::now().timestamp();
+async fn test_now_in_same_stmt_using_sql_function() -> Result<()> {
     let ctx = SessionContext::new();
-    let sql = "SELECT NOW(), NOW() as t2";
 
-    let msg = format!("Creating logical plan for '{}'", sql);
-    let plan = ctx.create_logical_plan(sql).expect(&msg);
+    let df1 = ctx.sql("select now(), now() as now2").await?;
+    let result = result_vec(&df1.collect().await?);
+    assert_eq!(result[0][0], result[0][1]);
 
-    let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
-    let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
+    Ok(())
+}
 
-    let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
-    let task_ctx = ctx.task_ctx();
-    let res = collect(plan, task_ctx).await.expect(&msg);
-    let actual = result_vec(&res);
+#[tokio::test]
+async fn test_now_across_statements() -> Result<()> {
+    let ctx = SessionContext::new();
 
-    let res1 = actual[0][0].as_str();
-    let res2 = actual[0][1].as_str();
-    let t3 = chrono::Utc::now().timestamp();
-    let t2_naive =
-        chrono::NaiveDateTime::parse_from_str(res1, "%Y-%m-%d %H:%M:%S%.6f").unwrap();
+    let actual1 = execute(&ctx, "SELECT NOW()").await;
+    let res1 = actual1[0][0].as_str();
 
-    let t2 = t2_naive.timestamp();
-    assert!(t1 <= t2 && t2 <= t3);
-    assert_eq!(res2, res1);
+    let actual2 = execute(&ctx, "SELECT NOW()").await;
+    let res2 = actual2[0][0].as_str();
+
+    assert!(res1 < res2);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_now_across_statements_using_sql_function() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let df1 = ctx.sql("select now()").await?;
+    let rb1 = df1.collect().await?;
+    let result1 = result_vec(&rb1);
+    let res1 = result1[0][0].as_str();
+
+    let df2 = ctx.sql("select now()").await?;
+    let rb2 = df2.collect().await?;
+    let result2 = result_vec(&rb2);
+    let res2 = result2[0][0].as_str();
+
+    assert!(res1 < res2);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_now_dataframe_api() -> Result<()> {
+    let ctx = SessionContext::new();
+    let df = ctx.sql("select 1").await?; // use this to get a DataFrame
+    let df = df.select(vec![now(), now().alias("now2")])?;
+    let result = result_vec(&df.collect().await?);
+    assert_eq!(result[0][0], result[0][1]);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_now_dataframe_api_across_statements() -> Result<()> {
+    let ctx = SessionContext::new();
+    let df = ctx.sql("select 1").await?; // use this to get a DataFrame
+    let df = df.select(vec![now()])?;
+    let result = result_vec(&df.collect().await?);
+
+    let df = ctx.sql("select 1").await?;
+    let df = df.select(vec![now()])?;
+    let result2 = result_vec(&df.collect().await?);
+
+    assert_ne!(result[0][0], result2[0][0]);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_now_in_view() -> Result<()> {
+    let ctx = SessionContext::new();
+    let _df = ctx
+        .sql("create or replace view test_now as select now()")
+        .await?
+        .collect()
+        .await?;
+
+    let df = ctx.sql("select * from test_now").await?;
+    let result = result_vec(&df.collect().await?);
+
+    let df1 = ctx.sql("select * from test_now").await?;
+    let result2 = result_vec(&df1.collect().await?);
+
+    assert_ne!(result[0][0], result2[0][0]);
 
     Ok(())
 }
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index d23a14ced..9b51273a0 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -367,10 +367,8 @@ nary_scalar_expr!(Btrim, btrim);
 //there is a func concat_ws before, so use concat_ws_expr as name.c
 nary_scalar_expr!(ConcatWithSeparator, concat_ws_expr);
 nary_scalar_expr!(Concat, concat_expr);
-nary_scalar_expr!(Now, now_expr);
 
 // date functions
-unary_scalar_expr!(Now, now, "current time"); //TODO this is not a unary expression https://github.com/apache/arrow-datafusion/issues/3069
 scalar_expr!(DatePart, date_part, part, date);
 scalar_expr!(DateTrunc, date_trunc, part, date);
 scalar_expr!(DateBin, date_bin, stride, source, origin);
@@ -398,6 +396,15 @@ pub fn coalesce(args: Vec<Expr>) -> Expr {
     }
 }
 
+/// Returns current timestamp in nanoseconds, using the same value for all instances of now() in
+/// same statement.
+pub fn now() -> Expr {
+    Expr::ScalarFunction {
+        fun: BuiltinScalarFunction::Now,
+        args: vec![],
+    }
+}
+
 /// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression.
 pub fn case(expr: Expr) -> CaseBuilder {
     CaseBuilder::new(Some(Box::new(expr)), vec![], vec![], None)
@@ -564,7 +571,6 @@ mod test {
         test_unary_scalar_expr!(Atan, atan);
         test_unary_scalar_expr!(Floor, floor);
         test_unary_scalar_expr!(Ceil, ceil);
-        test_unary_scalar_expr!(Now, now);
         test_unary_scalar_expr!(Round, round);
         test_unary_scalar_expr!(Trunc, trunc);
         test_unary_scalar_expr!(Abs, abs);
diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs
index e1e0fd822..bd33b8bc8 100644
--- a/datafusion/physical-expr/src/datetime_expressions.rs
+++ b/datafusion/physical-expr/src/datetime_expressions.rs
@@ -170,8 +170,8 @@ pub fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result<ColumnarValue> {
 /// specified timestamp.
 ///
 /// The semantics of `now()` require it to return the same value
-/// whenever it is called in a query. This this value is chosen during
-/// planning time and bound into a closure that
+/// wherever it appears within a single statement. This value is
+/// chosen during planning time.
 pub fn make_now(
     now_ts: DateTime<Utc>,
 ) -> impl Fn(&[ColumnarValue]) -> Result<ColumnarValue> {
diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs
index 97b21c4cd..ca22eb436 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -36,7 +36,7 @@ use datafusion_expr::{
     character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, date_bin,
     date_part, date_trunc, digest, exp, floor, from_unixtime, left, ln, log10, log2,
     logical_plan::{PlanType, StringifiedPlan},
-    lower, lpad, ltrim, md5, now_expr, nullif, octet_length, power, random, regexp_match,
+    lower, lpad, ltrim, md5, now, nullif, octet_length, power, random, regexp_match,
     regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256,
     sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, tan,
     to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate,
@@ -1117,12 +1117,7 @@ pub fn parse_expr(
                 ScalarFunction::ToTimestampSeconds => {
                     Ok(to_timestamp_seconds(parse_expr(&args[0], registry)?))
                 }
-                ScalarFunction::Now => Ok(now_expr(
-                    args.to_owned()
-                        .iter()
-                        .map(|expr| parse_expr(expr, registry))
-                        .collect::<Result<Vec<_>, _>>()?,
-                )),
+                ScalarFunction::Now => Ok(now()),
                 ScalarFunction::Translate => Ok(translate(
                     parse_expr(&args[0], registry)?,
                     parse_expr(&args[1], registry)?,
diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md
index 0791cdf3a..11725f90d 100644
--- a/docs/source/user-guide/sql/scalar_functions.md
+++ b/docs/source/user-guide/sql/scalar_functions.md
@@ -246,7 +246,7 @@ Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolut
 `extract(field FROM source)`
 
 - The `extract` function retrieves subfields such as year or hour from date/time values.
-  `source` must be a value expression of type timestamp, Data32, or Data64. `field` is an identifier that selects what field to extract from the source value.
+  `source` must be a value expression of type timestamp, Date32, or Date64. `field` is an identifier that selects what field to extract from the source value.
   The `extract` function returns values of type u32.
   - `year` :`extract(year FROM to_timestamp('2020-09-08T12:00:00+00:00')) -> 2020`
   - `month`:`extract(month FROM to_timestamp('2020-09-08T12:00:00+00:00')) -> 9`
@@ -273,7 +273,8 @@ Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolut
 
 ### `now`
 
-current time
+Returns current time as `Timestamp(Nanoseconds, UTC)`. Returns same value for the function
+wherever it appears in the statement, using a value chosen at planning time.
 
 ## Other Functions