You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/08/23 07:56:54 UTC
[arrow-datafusion] branch master updated: fix issue with now() returning same value across statements (#3210)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new eedc7874c fix issue with now() returning same value across statements (#3210)
eedc7874c is described below
commit eedc7874c4ed371d4df6f4b36690f78da82e2524
Author: kmitchener <km...@gmail.com>
AuthorDate: Tue Aug 23 03:56:48 2022 -0400
fix issue with now() returning same value across statements (#3210)
---
datafusion/core/src/dataframe.rs | 21 ++++-
datafusion/core/src/datasource/view.rs | 7 +-
datafusion/core/src/execution/context.rs | 10 +--
datafusion/core/src/logical_plan/mod.rs | 8 +-
datafusion/core/src/physical_plan/mod.rs | 8 +-
datafusion/core/tests/sql/timestamp.rs | 100 +++++++++++++++++----
datafusion/expr/src/expr_fn.rs | 12 ++-
.../physical-expr/src/datetime_expressions.rs | 4 +-
datafusion/proto/src/from_proto.rs | 9 +-
docs/source/user-guide/sql/scalar_functions.md | 5 +-
10 files changed, 133 insertions(+), 51 deletions(-)
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index 66a5671f3..bb572b7d5 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -84,8 +84,25 @@ impl DataFrame {
/// Create a physical plan
pub async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
- let state = self.session_state.read().clone();
- state.create_physical_plan(&self.plan).await
+ // this function is copied from SessionContext function of the
+ // same name
+ let state_cloned = {
+ let mut state = self.session_state.write();
+ state.execution_props.start_execution();
+
+ // We need to clone `state` to release the lock that is not `Send`. We could
+ // make the lock `Send` by using `tokio::sync::Mutex`, but that would require to
+ // propagate async even to the `LogicalPlan` building methods.
+ // Cloning `state` here is fine as we then pass it as immutable `&state`, which
+ // means that we avoid write consistency issues as the cloned version will not
+ // be written to. As for eventual modifications that would be applied to the
+ // original state after it has been cloned, they will not be picked up by the
+ // clone but that is okay, as it is equivalent to postponing the state update
+ // by keeping the lock until the end of the function scope.
+ state.clone()
+ };
+
+ state_cloned.create_physical_plan(&self.plan).await
}
/// Filter the DataFrame by column. Returns a new DataFrame only containing the
diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs
index d7ec155d1..c93d83f27 100644
--- a/datafusion/core/src/datasource/view.rs
+++ b/datafusion/core/src/datasource/view.rs
@@ -80,12 +80,15 @@ impl TableProvider for ViewTable {
async fn scan(
&self,
- ctx: &SessionState,
+ state: &SessionState,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
- ctx.create_physical_plan(&self.logical_plan).await
+ // clone state and start_execution so that now() works in views
+ let mut state_cloned = state.clone();
+ state_cloned.execution_props.start_execution();
+ state_cloned.create_physical_plan(&self.logical_plan).await
}
}
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 64403715c..9c9ed9526 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -365,20 +365,18 @@ impl SessionContext {
match (or_replace, view) {
(true, Ok(_)) => {
self.deregister_table(name.as_str())?;
- let plan = self.optimize(&input)?;
let table =
- Arc::new(ViewTable::try_new(plan.clone(), definition)?);
+ Arc::new(ViewTable::try_new((*input).clone(), definition)?);
self.register_table(name.as_str(), table)?;
- Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &input)))
}
(_, Err(_)) => {
- let plan = self.optimize(&input)?;
let table =
- Arc::new(ViewTable::try_new(plan.clone(), definition)?);
+ Arc::new(ViewTable::try_new((*input).clone(), definition)?);
self.register_table(name.as_str(), table)?;
- Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &input)))
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index 87a02ae01..a79e9f5e0 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -50,10 +50,10 @@ pub use datafusion_expr::{
StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values,
},
- lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, now_expr,
- nullif, octet_length, or, power, random, regexp_match, regexp_replace, repeat,
- replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384,
- sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
+ lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, nullif,
+ octet_length, or, power, random, regexp_match, regexp_replace, repeat, replace,
+ reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384, sha512,
+ signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator,
};
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 5e5f38ffc..4613be7cd 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -120,12 +120,12 @@ pub struct ColumnStatistics {
/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
///
-/// Each `ExecutionPlan` is Partition-aware and is responsible for
+/// Each `ExecutionPlan` is partition-aware and is responsible for
/// creating the actual `async` [`SendableRecordBatchStream`]s
/// of [`RecordBatch`] that incrementally compute the operator's
/// output from its input partition.
///
-/// [`ExecutionPlan`] can be displayed in an simplified form using the
+/// [`ExecutionPlan`] can be displayed in a simplified form using the
/// return value from [`displayable`] in addition to the (normally
/// quite verbose) `Debug` output.
pub trait ExecutionPlan: Debug + Send + Sync {
@@ -168,7 +168,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// The default implementation returns `true`
///
/// WARNING: if you override this default and return `false`, your
- /// operator can not rely on datafusion preserving the input order
+ /// operator can not rely on DataFusion preserving the input order
/// as it will likely not.
fn relies_on_input_order(&self) -> bool {
true
@@ -200,7 +200,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// parallelism may outweigh any benefits
///
/// The default implementation returns `true` unless this operator
- /// has signalled it requiers a single child input partition.
+ /// has signalled it requires a single child input partition.
fn benefits_from_input_partitioning(&self) -> bool {
// By default try to maximize parallelism with more CPUs if
// possible
diff --git a/datafusion/core/tests/sql/timestamp.rs b/datafusion/core/tests/sql/timestamp.rs
index 9f9004d6b..57900bd29 100644
--- a/datafusion/core/tests/sql/timestamp.rs
+++ b/datafusion/core/tests/sql/timestamp.rs
@@ -436,31 +436,93 @@ async fn test_current_timestamp_expressions() -> Result<()> {
}
#[tokio::test]
-async fn test_current_timestamp_expressions_non_optimized() -> Result<()> {
- let t1 = chrono::Utc::now().timestamp();
+async fn test_now_in_same_stmt_using_sql_function() -> Result<()> {
let ctx = SessionContext::new();
- let sql = "SELECT NOW(), NOW() as t2";
- let msg = format!("Creating logical plan for '{}'", sql);
- let plan = ctx.create_logical_plan(sql).expect(&msg);
+ let df1 = ctx.sql("select now(), now() as now2").await?;
+ let result = result_vec(&df1.collect().await?);
+ assert_eq!(result[0][0], result[0][1]);
- let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
- let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
+ Ok(())
+}
- let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
- let task_ctx = ctx.task_ctx();
- let res = collect(plan, task_ctx).await.expect(&msg);
- let actual = result_vec(&res);
+#[tokio::test]
+async fn test_now_across_statements() -> Result<()> {
+ let ctx = SessionContext::new();
- let res1 = actual[0][0].as_str();
- let res2 = actual[0][1].as_str();
- let t3 = chrono::Utc::now().timestamp();
- let t2_naive =
- chrono::NaiveDateTime::parse_from_str(res1, "%Y-%m-%d %H:%M:%S%.6f").unwrap();
+ let actual1 = execute(&ctx, "SELECT NOW()").await;
+ let res1 = actual1[0][0].as_str();
- let t2 = t2_naive.timestamp();
- assert!(t1 <= t2 && t2 <= t3);
- assert_eq!(res2, res1);
+ let actual2 = execute(&ctx, "SELECT NOW()").await;
+ let res2 = actual2[0][0].as_str();
+
+ assert!(res1 < res2);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_now_across_statements_using_sql_function() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let df1 = ctx.sql("select now()").await?;
+ let rb1 = df1.collect().await?;
+ let result1 = result_vec(&rb1);
+ let res1 = result1[0][0].as_str();
+
+ let df2 = ctx.sql("select now()").await?;
+ let rb2 = df2.collect().await?;
+ let result2 = result_vec(&rb2);
+ let res2 = result2[0][0].as_str();
+
+ assert!(res1 < res2);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_now_dataframe_api() -> Result<()> {
+ let ctx = SessionContext::new();
+ let df = ctx.sql("select 1").await?; // use this to get a DataFrame
+ let df = df.select(vec![now(), now().alias("now2")])?;
+ let result = result_vec(&df.collect().await?);
+ assert_eq!(result[0][0], result[0][1]);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_now_dataframe_api_across_statements() -> Result<()> {
+ let ctx = SessionContext::new();
+ let df = ctx.sql("select 1").await?; // use this to get a DataFrame
+ let df = df.select(vec![now()])?;
+ let result = result_vec(&df.collect().await?);
+
+ let df = ctx.sql("select 1").await?;
+ let df = df.select(vec![now()])?;
+ let result2 = result_vec(&df.collect().await?);
+
+ assert_ne!(result[0][0], result2[0][0]);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_now_in_view() -> Result<()> {
+ let ctx = SessionContext::new();
+ let _df = ctx
+ .sql("create or replace view test_now as select now()")
+ .await?
+ .collect()
+ .await?;
+
+ let df = ctx.sql("select * from test_now").await?;
+ let result = result_vec(&df.collect().await?);
+
+ let df1 = ctx.sql("select * from test_now").await?;
+ let result2 = result_vec(&df1.collect().await?);
+
+ assert_ne!(result[0][0], result2[0][0]);
Ok(())
}
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index d23a14ced..9b51273a0 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -367,10 +367,8 @@ nary_scalar_expr!(Btrim, btrim);
//there is a func concat_ws before, so use concat_ws_expr as name.c
nary_scalar_expr!(ConcatWithSeparator, concat_ws_expr);
nary_scalar_expr!(Concat, concat_expr);
-nary_scalar_expr!(Now, now_expr);
// date functions
-unary_scalar_expr!(Now, now, "current time"); //TODO this is not a unary expression https://github.com/apache/arrow-datafusion/issues/3069
scalar_expr!(DatePart, date_part, part, date);
scalar_expr!(DateTrunc, date_trunc, part, date);
scalar_expr!(DateBin, date_bin, stride, source, origin);
@@ -398,6 +396,15 @@ pub fn coalesce(args: Vec<Expr>) -> Expr {
}
}
+/// Returns current timestamp in nanoseconds, using the same value for all instances of now() in
+/// same statement.
+pub fn now() -> Expr {
+ Expr::ScalarFunction {
+ fun: BuiltinScalarFunction::Now,
+ args: vec![],
+ }
+}
+
/// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression.
pub fn case(expr: Expr) -> CaseBuilder {
CaseBuilder::new(Some(Box::new(expr)), vec![], vec![], None)
@@ -564,7 +571,6 @@ mod test {
test_unary_scalar_expr!(Atan, atan);
test_unary_scalar_expr!(Floor, floor);
test_unary_scalar_expr!(Ceil, ceil);
- test_unary_scalar_expr!(Now, now);
test_unary_scalar_expr!(Round, round);
test_unary_scalar_expr!(Trunc, trunc);
test_unary_scalar_expr!(Abs, abs);
diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs
index e1e0fd822..bd33b8bc8 100644
--- a/datafusion/physical-expr/src/datetime_expressions.rs
+++ b/datafusion/physical-expr/src/datetime_expressions.rs
@@ -170,8 +170,8 @@ pub fn to_timestamp_seconds(args: &[ColumnarValue]) -> Result<ColumnarValue> {
/// specified timestamp.
///
/// The semantics of `now()` require it to return the same value
-/// whenever it is called in a query. This this value is chosen during
-/// planning time and bound into a closure that
+/// wherever it appears within a single statement. This value is
+/// chosen during planning time.
pub fn make_now(
now_ts: DateTime<Utc>,
) -> impl Fn(&[ColumnarValue]) -> Result<ColumnarValue> {
diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs
index 97b21c4cd..ca22eb436 100644
--- a/datafusion/proto/src/from_proto.rs
+++ b/datafusion/proto/src/from_proto.rs
@@ -36,7 +36,7 @@ use datafusion_expr::{
character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, date_bin,
date_part, date_trunc, digest, exp, floor, from_unixtime, left, ln, log10, log2,
logical_plan::{PlanType, StringifiedPlan},
- lower, lpad, ltrim, md5, now_expr, nullif, octet_length, power, random, regexp_match,
+ lower, lpad, ltrim, md5, now, nullif, octet_length, power, random, regexp_match,
regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256,
sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, tan,
to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate,
@@ -1117,12 +1117,7 @@ pub fn parse_expr(
ScalarFunction::ToTimestampSeconds => {
Ok(to_timestamp_seconds(parse_expr(&args[0], registry)?))
}
- ScalarFunction::Now => Ok(now_expr(
- args.to_owned()
- .iter()
- .map(|expr| parse_expr(expr, registry))
- .collect::<Result<Vec<_>, _>>()?,
- )),
+ ScalarFunction::Now => Ok(now()),
ScalarFunction::Translate => Ok(translate(
parse_expr(&args[0], registry)?,
parse_expr(&args[1], registry)?,
diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md
index 0791cdf3a..11725f90d 100644
--- a/docs/source/user-guide/sql/scalar_functions.md
+++ b/docs/source/user-guide/sql/scalar_functions.md
@@ -246,7 +246,7 @@ Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolut
`extract(field FROM source)`
- The `extract` function retrieves subfields such as year or hour from date/time values.
- `source` must be a value expression of type timestamp, Data32, or Data64. `field` is an identifier that selects what field to extract from the source value.
+ `source` must be a value expression of type timestamp, Date32, or Date64. `field` is an identifier that selects what field to extract from the source value.
The `extract` function returns values of type u32.
- `year` :`extract(year FROM to_timestamp('2020-09-08T12:00:00+00:00')) -> 2020`
- `month`:`extract(month FROM to_timestamp('2020-09-08T12:00:00+00:00')) -> 9`
@@ -273,7 +273,8 @@ Note that `CAST(.. AS Timestamp)` converts to Timestamps with Nanosecond resolut
### `now`
-current time
+Returns current time as `Timestamp(Nanoseconds, UTC)`. Returns same value for the function
+wherever it appears in the statement, using a value chosen at planning time.
## Other Functions