You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dbatomic (via GitHub)" <gi...@apache.org> on 2023/12/12 13:57:04 UTC

[PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

dbatomic opened a new pull request, #44316:
URL: https://github.com/apache/spark/pull/44316

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   Call `ComputeCurrentTime` prior to applying `ResolveInlineTables` rule. This will make sure that all current time aware expressions get replaced prior to evaluation as part of inline table resolution.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   Prior to this change following query would return value 3.
   ```sql
   SELECT COUNT(DISTINCT ct) FROM VALUES
   (CURRENT_TIMESTAMP()),
   (CURRENT_TIMESTAMP()),
   (CURRENT_TIMESTAMP()) as data(ct)
   ```
   
   After this change, it will return 1, which is proper return value.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   See section above.
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   New test that validates this behaviour is introduced.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1424291791


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -33,12 +34,14 @@ import org.apache.spark.sql.types.{StructField, StructType}
  */
 object ResolveInlineTables extends Rule[LogicalPlan]
   with CastSupport with AliasHelper with EvalHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
-    AlwaysProcess.fn, ruleId) {
-    case table: UnresolvedInlineTable if table.expressionsResolved =>
-      validateInputDimension(table)
-      validateInputEvaluable(table)
-      convert(table)
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperatorsDown(p => ComputeCurrentTime(p)).resolveOperatorsWithPruning(

Review Comment:
   I thought about the same thing but there is this comment:
   
       // Technically some of the rules in Finish Analysis are not optimizer rules and belong more
       // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
       // However, because we also use the analyzer to canonicalized queries (for view definition),
       // we do not eliminate subqueries or compute current time in the analyzer.
       
   I also wandered why we don't do ComputeCurrentTime in analysis phase, but this comment does make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1424536555


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -33,12 +34,14 @@ import org.apache.spark.sql.types.{StructField, StructType}
  */
 object ResolveInlineTables extends Rule[LogicalPlan]
   with CastSupport with AliasHelper with EvalHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
-    AlwaysProcess.fn, ruleId) {
-    case table: UnresolvedInlineTable if table.expressionsResolved =>
-      validateInputDimension(table)
-      validateInputEvaluable(table)
-      convert(table)
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperatorsDown(p => ComputeCurrentTime(p)).resolveOperatorsWithPruning(

Review Comment:
   I did try that originally but I got:
   `      throw QueryExecutionErrors.methodCalledInAnalyzerNotAllowedError()` 
   
   I ended up with resolveOperatorsDown due to this comment:
   ```scala
     /**
      * In analyzer, use [[resolveOperatorsDown()]] instead. If this is used in the analyzer,
      * an exception will be thrown in test mode. It is however OK to call this function within
      * the scope of a [[resolveOperatorsDown()]] call.
      * @see [[org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning()]].
      */
     override def transformDownWithPruning(cond: TreePatternBits => Boolean,
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "srielau (via GitHub)" <gi...@apache.org>.
srielau commented on PR #44316:
URL: https://github.com/apache/spark/pull/44316#issuecomment-1858156901

   I agree with the semantic changes. General comment, though:
   VALUES in its current implementation is much too restrictive.
   Will this PR move us closer to allowing arbitrary expressions (including non determinism and correlation)?
   E.g. the following cannot be done in VALUES today:
   scala> spark.sql("VALUES (rand())").show()
   org.apache.spark.sql.AnalysisException: [INVALID_INLINE_TABLE.CANNOT_EVALUATE_EXPRESSION_IN_INLINE_TABLE] Invalid inline table. Cannot evaluate the expression "rand()" in inline table definition. SQLSTATE: 42000; line 1 pos 8
   
   or:
   SELECT pk, c FROM T, LATERAL(VALUES(T.c1), (T.c2)) AS unpivot(c)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1430980065


##########
sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala:
##########
@@ -78,6 +78,11 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession {
 
     // Execution in one query should return the same value
     checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = CURRENT_TIMESTAMP()"""), Row(true))
+    checkAnswer(sql(
+      """SELECT COUNT(DISTINCT ct) FROM VALUES
+        | CURRENT_TIMESTAMP(),
+        | CURRENT_TIMESTAMP(),
+        | CURRENT_TIMESTAMP() as data(ct)""".stripMargin), Row(1))

Review Comment:
   This seems not needed as we have the same test in the golden file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1430112856


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -128,6 +128,19 @@ case class UnresolvedInlineTable(
   lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved))
 }
 
+/**
+ * An temp resolved inline table that holds all the expressions that were checked for
+ * the right shape and common data types.
+ * This is a preparation step for [[org.apache.spark.sql.catalyst.optimizer.EvalInlineTables]] which
+ * will produce a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]
+ * for this inline table.
+ *
+ * @param output list of column attributes
+ * @param rows expressions for the data rows
+ */
+case class TempResolvedInlineTable(rows: Seq[Seq[Expression]], output: Seq[Attribute])

Review Comment:
   maybe this is over-conservative. Many rules will be skipped if the expression is already resolved. We can add the wrapper class when needed in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1431386760


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -105,26 +107,31 @@ object ResolveInlineTables extends Rule[LogicalPlan]
     val attributes = DataTypeUtils.toAttributes(StructType(fields))
     assert(fields.size == table.names.size)
 
-    val newRows: Seq[InternalRow] = table.rows.map { row =>
-      InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) =>
-        val targetType = fields(ci).dataType
-        try {
+    val castedRows: Seq[Seq[Expression]] = table.rows.map { row =>
+      row.zipWithIndex.map {
+        case (e, ci) =>
+          val targetType = fields(ci).dataType
           val castedExpr = if (DataTypeUtils.sameType(e.dataType, targetType)) {
             e
           } else {
             cast(e, targetType)
           }
-          prepareForEval(castedExpr).eval()
-        } catch {
-          case NonFatal(ex) =>
-            table.failAnalysis(
-              errorClass = "INVALID_INLINE_TABLE.FAILED_SQL_EXPRESSION_EVALUATION",
-              messageParameters = Map("sqlExpr" -> toSQLExpr(e)),
-              cause = ex)
-        }
-      })
+          castedExpr
+      }
     }
 
-    LocalRelation(attributes, newRows)
+    ResolvedInlineTable(castedRows, attributes)
+  }
+
+  /**
+   * This function attempts to early evaluate rows in inline table.
+   * If evaluation doesn't rely on non-deterministic expressions (e.g. current_like)
+   * expressions will be evaluated and inlined as [[LocalRelation]]
+   * This is package visible for unit testing.
+   */
+  private[analysis] def earlyEvalIfPossible(table: ResolvedInlineTable): LogicalPlan = {
+      def earlyEvalPossible =
+        table.rows.flatten.forall(!_.containsPattern(CURRENT_LIKE))
+      if (earlyEvalPossible) EvalInlineTables(table) else table

Review Comment:
   ```suggestion
       val earlyEvalPossible = table.rows.flatten.forall(!_.containsPattern(CURRENT_LIKE))
       if (earlyEvalPossible) EvalInlineTables(table) else table
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1424433276


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -33,12 +34,14 @@ import org.apache.spark.sql.types.{StructField, StructType}
  */
 object ResolveInlineTables extends Rule[LogicalPlan]
   with CastSupport with AliasHelper with EvalHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
-    AlwaysProcess.fn, ruleId) {
-    case table: UnresolvedInlineTable if table.expressionsResolved =>
-      validateInputDimension(table)
-      validateInputEvaluable(table)
-      convert(table)
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperatorsDown(p => ComputeCurrentTime(p)).resolveOperatorsWithPruning(

Review Comment:
   `ComputeCurrentTime` transforms the entire plan tree, so the code should be `ComputeCurrentTime(plan).resolveOperatorsWithPruning ...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on PR #44316:
URL: https://github.com/apache/spark/pull/44316#issuecomment-1858208985

   > I agree with the semantic changes. General comment, though: VALUES in its current implementation is much too restrictive. Will this PR move us closer to allowing arbitrary expressions (including non determinism and correlation)? E.g. the following cannot be done in VALUES today: scala> spark.sql("VALUES (rand())").show() org.apache.spark.sql.AnalysisException: [INVALID_INLINE_TABLE.CANNOT_EVALUATE_EXPRESSION_IN_INLINE_TABLE] Invalid inline table. Cannot evaluate the expression "rand()" in inline table definition. SQLSTATE: 42000; line 1 pos 8
   > 
   > or: SELECT pk, c FROM T, LATERAL(VALUES(T.c1), (T.c2)) AS unpivot(c)
   
   Yeah, I maybe can use this PR to deal with rand() problem as well (i.e. allow non-deterministic expressions in VALUES), because it is rather similar to CURRENT_* issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on PR #44316:
URL: https://github.com/apache/spark/pull/44316#issuecomment-1855498798

   > I checked in Postgres. `select CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP;` The output:
   > 
   > ```
   > current_timestamp            |current_timestamp            |current_timestamp            |
   > -----------------------------+-----------------------------+-----------------------------+
   > 2023-12-14 16:27:19.663 +0800|2023-12-14 16:27:19.663 +0800|2023-12-14 16:27:19.663 +0800|
   > ```
   
   Yep, just to illustrate that. the same works for inline tables:
   
   ```sql
   postgres=# SELECT * FROM (VALUES(EXTRACT(epoch FROM current_timestamp),EXTRACT(epoch FROM current_timestamp),EXTRACT(epoch FROM current_timestamp)));
   ```
   
         column1      |      column2      |      column3
   -------------------+-------------------+-------------------
    1702546463.398428 | 1702546463.398428 | 1702546463.398428
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1430978614


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -105,26 +107,40 @@ object ResolveInlineTables extends Rule[LogicalPlan]
     val attributes = DataTypeUtils.toAttributes(StructType(fields))
     assert(fields.size == table.names.size)
 
-    val newRows: Seq[InternalRow] = table.rows.map { row =>
-      InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) =>
-        val targetType = fields(ci).dataType
-        try {
+    val castedRows: Seq[Seq[Expression]] = table.rows.map { row =>
+      row.zipWithIndex.map {
+        case (e, ci) =>
+          val targetType = fields(ci).dataType
           val castedExpr = if (DataTypeUtils.sameType(e.dataType, targetType)) {
             e
           } else {
             cast(e, targetType)
           }
-          prepareForEval(castedExpr).eval()
-        } catch {
-          case NonFatal(ex) =>
-            table.failAnalysis(
-              errorClass = "INVALID_INLINE_TABLE.FAILED_SQL_EXPRESSION_EVALUATION",
-              messageParameters = Map("sqlExpr" -> toSQLExpr(e)),
-              cause = ex)
-        }
-      })
+          castedExpr
+      }
     }
 
-    LocalRelation(attributes, newRows)
+    val noCurrentLike: Boolean = castedRows.flatten.forall(!_.containsPattern(CURRENT_LIKE))
+    if (noCurrentLike) {
+      val evalRows: Seq[InternalRow] =

Review Comment:
   to avoid duplicated code, shall we always create `ResolvedInlineTable` and then invoke `EvalInlineTables.apply` when there is no current_like?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1430979164


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala:
##########
@@ -70,6 +74,33 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Computes expressions in inline tables. This rule is supposed to be called at the very end
+ * of the analysis phase, given that all the expressions need to be fully resolved/replaced
+ * at this point.
+ */
+object EvalInlineTables extends Rule[LogicalPlan] with CastSupport {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformDownWithSubqueriesAndPruning(
+    AlwaysProcess.fn, ruleId) {
+    case table: ResolvedInlineTable =>
+      val newRows: Seq[InternalRow] =
+        table.rows.map { row => InternalRow.fromSeq(
+            row.map(e =>

Review Comment:
   nit: for multi-line lambda we should wrap with `row.map { e => ...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1426479146


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -33,12 +34,14 @@ import org.apache.spark.sql.types.{StructField, StructType}
  */
 object ResolveInlineTables extends Rule[LogicalPlan]
   with CastSupport with AliasHelper with EvalHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
-    AlwaysProcess.fn, ruleId) {
-    case table: UnresolvedInlineTable if table.expressionsResolved =>
-      validateInputDimension(table)
-      validateInputEvaluable(table)
-      convert(table)
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperatorsDown(p => ComputeCurrentTime(p)).resolveOperatorsWithPruning(

Review Comment:
   I think that my current approach is not valid. i.e. if we do ComputeCurrentTime here we will break following assumption:
   
   // Technically some of the rules in Finish Analysis are not optimizer rules and belong more
   // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
   // However, because we also use the analyzer to canonicalized queries (for view definition),
   // we do not eliminate subqueries or compute current time in the analyzer.
   
   So, if we had inline table in view definition this wouldn't work (although this is currently incorrect as well). 
   
   On the other hand, if we were to replace current time only for inline table values then we might end up with different results for CURRENT_DATE/TIMESTAMP in other parts of the plan so that doesn't work either.
   
   I will see if I can delay inline table evaluation, or at least delay it up for expressions for which we can guarantee that they will be replaced by a literal. This PR is on hold until then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1428351612


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -128,6 +128,19 @@ case class UnresolvedInlineTable(
   lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved))
 }
 
+/**
+ * An temp resolved inline table that holds all the expressions that were checked for
+ * the right shape and common data types.
+ * This is a preparation step for [[org.apache.spark.sql.catalyst.optimizer.EvalInlineTables]] which
+ * will produce a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]
+ * for this inline table.
+ *
+ * @param output list of column attributes
+ * @param rows expressions for the data rows
+ */
+case class TempResolvedInlineTable(rows: Seq[Seq[Expression]], output: Seq[Attribute])

Review Comment:
   it's fully resolved, just not evaluated. Maybe remove tmp from the name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1432721154


##########
sql/core/src/test/resources/sql-tests/inputs/inline-table.sql:
##########
@@ -60,3 +60,9 @@ select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-
 select * from values (try_add(5, 0));
 select * from values (try_divide(5, 0));
 select * from values (10 + try_divide(5, 0));
+
+-- now() should be kept as tempResolved inline expression.
+select count(distinct ct) from values now(), now(), now() as data(ct);
+
+-- current_timestamp() should be kept as tempResolved inline expression.
+select count(distinct ct) from values current_timestamp(), current_timestamp() as data(ct);

Review Comment:
   Shall we add tests mixed `current_timestamp` and other deterministic function?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala:
##########
@@ -70,6 +75,34 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Computes expressions in inline tables. This rule is supposed to be called at the very end
+ * of the analysis phase, given that all the expressions need to be fully resolved/replaced
+ * at this point.
+ */
+object EvalInlineTables extends Rule[LogicalPlan] with CastSupport {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformDownWithSubqueriesAndPruning(_.containsPattern(INLINE_TABLE_EVAL)) {
+      case table: ResolvedInlineTable =>
+        val newRows: Seq[InternalRow] =
+          table.rows.map { row => InternalRow.fromSeq(
+            row.map { e =>

Review Comment:
   ```suggestion
             table.rows.map { row => InternalRow.fromSeq(row.map { e =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1428353271


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -128,6 +128,19 @@ case class UnresolvedInlineTable(
   lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved))
 }
 
+/**
+ * An temp resolved inline table that holds all the expressions that were checked for
+ * the right shape and common data types.
+ * This is a preparation step for [[org.apache.spark.sql.catalyst.optimizer.EvalInlineTables]] which
+ * will produce a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]
+ * for this inline table.
+ *
+ * @param output list of column attributes
+ * @param rows expressions for the data rows
+ */
+case class TempResolvedInlineTable(rows: Seq[Seq[Expression]], output: Seq[Attribute])

Review Comment:
   To avoid any perf regression, we can put `rows` in a wrapper class so that catalyst rules won't see them by default. These expressions do not need any more extra resolution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1431388020


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala:
##########
@@ -70,6 +75,33 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Computes expressions in inline tables. This rule is supposed to be called at the very end
+ * of the analysis phase, given that all the expressions need to be fully resolved/replaced
+ * at this point.
+ */
+object EvalInlineTables extends Rule[LogicalPlan] with CastSupport {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformDownWithSubqueriesAndPruning(
+    AlwaysProcess.fn, ruleId) {

Review Comment:
   let's add pruning for `ResolvedInlineTable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1431386017


##########
sql/core/src/test/resources/sql-tests/results/inline-table.sql.out:
##########
@@ -271,3 +271,11 @@ select * from values (10 + try_divide(5, 0))
 struct<col1:double>
 -- !query output
 NULL
+
+
+-- !query
+select count(distinct ct) from values now(), now(), now() as data(ct)

Review Comment:
   We can add a test for `CURRENT_TIMESTAMP` and remove https://github.com/apache/spark/pull/44316/files#r1430980065



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #44316: [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions.
URL: https://github.com/apache/spark/pull/44316


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1433678697


##########
sql/core/src/test/resources/sql-tests/inputs/inline-table.sql:
##########
@@ -60,3 +60,9 @@ select * from values (timestamp('1991-12-06 00:00:00.0'), array(timestamp('1991-
 select * from values (try_add(5, 0));
 select * from values (try_divide(5, 0));
 select * from values (10 + try_divide(5, 0));
+
+-- now() should be kept as tempResolved inline expression.
+select count(distinct ct) from values now(), now(), now() as data(ct);
+
+-- current_timestamp() should be kept as tempResolved inline expression.
+select count(distinct ct) from values current_timestamp(), current_timestamp() as data(ct);

Review Comment:
   it's testing the correct value using count distinct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1426236880


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -33,12 +34,14 @@ import org.apache.spark.sql.types.{StructField, StructType}
  */
 object ResolveInlineTables extends Rule[LogicalPlan]
   with CastSupport with AliasHelper with EvalHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
-    AlwaysProcess.fn, ruleId) {
-    case table: UnresolvedInlineTable if table.expressionsResolved =>
-      validateInputDimension(table)
-      validateInputEvaluable(table)
-      convert(table)
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperatorsDown(p => ComputeCurrentTime(p)).resolveOperatorsWithPruning(

Review Comment:
   We can wrap the code with `AnalysisHelper.allowInvokingTransformsInAnalyzer` to bypass this limitation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "MaxGekk (via GitHub)" <gi...@apache.org>.
MaxGekk commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1424212477


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -33,12 +34,14 @@ import org.apache.spark.sql.types.{StructField, StructType}
  */
 object ResolveInlineTables extends Rule[LogicalPlan]
   with CastSupport with AliasHelper with EvalHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
-    AlwaysProcess.fn, ruleId) {
-    case table: UnresolvedInlineTable if table.expressionsResolved =>
-      validateInputDimension(table)
-      validateInputEvaluable(table)
-      convert(table)
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperatorsDown(p => ComputeCurrentTime(p)).resolveOperatorsWithPruning(

Review Comment:
   Just wonder, can't we just move `ComputeCurrentTime` earlier somewhere and skip resolution here if there is the pattern `CURRENT_LIKE`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1433352473


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -105,26 +107,30 @@ object ResolveInlineTables extends Rule[LogicalPlan]
     val attributes = DataTypeUtils.toAttributes(StructType(fields))
     assert(fields.size == table.names.size)
 
-    val newRows: Seq[InternalRow] = table.rows.map { row =>
-      InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) =>
-        val targetType = fields(ci).dataType
-        try {
+    val castedRows: Seq[Seq[Expression]] = table.rows.map { row =>

Review Comment:
   It seems we only need the `Seq[Expression]` here.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -131,6 +131,21 @@ case class UnresolvedInlineTable(
   lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved))
 }
 
+/**
+ * An resolved inline table that holds all the expressions that were checked for
+ * the right shape and common data types.
+ * This is a preparation step for [[org.apache.spark.sql.catalyst.optimizer.EvalInlineTables]] which
+ * will produce a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]
+ * for this inline table.
+ *
+ * @param output list of column attributes
+ * @param rows expressions for the data rows
+ */
+case class ResolvedInlineTable(rows: Seq[Seq[Expression]], output: Seq[Attribute])

Review Comment:
   Shall we simplify `rows: Seq[Seq[Expression]]` to `exprs: Seq[Expression]`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1427996408


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -33,12 +34,14 @@ import org.apache.spark.sql.types.{StructField, StructType}
  */
 object ResolveInlineTables extends Rule[LogicalPlan]
   with CastSupport with AliasHelper with EvalHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
-    AlwaysProcess.fn, ruleId) {
-    case table: UnresolvedInlineTable if table.expressionsResolved =>
-      validateInputDimension(table)
-      validateInputEvaluable(table)
-      convert(table)
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperatorsDown(p => ComputeCurrentTime(p)).resolveOperatorsWithPruning(

Review Comment:
   I played a bit more and came up with solution that is in current PR (which I don't like that much to be honest, since it adds another rule in post analyzer phase).
   
   Here is the rough logic:
   1) In `ResolveInlineTable` we make sure that shape is correct and that there is proper casting for all the objects. Here we also decide whether all expressions can be reduced to `LocalRelation` or we need to postpone that step. Option here is to always postpone - I originally went with that approach but that changed bunch of GOLDEN files since we now have TempResolvedInlineTable after every analysis run. But still, it is cleaner to always postpone - looking for feedback here.
   2) In EvalInlineTable, after ReplaceCurrentTime/Like we do final evaluation if needed.
   
   By doing this we can solve a couple of additional "bugs" - e.g. when creating view as CREATE VIEW V AS (SELECT * FROM VALUES (NOW())) previous behaviour was to embed time of view creation in query, while proper thing should be to calculate time on every invocation of the view (I verified that PG does this).
   
   An option is to also do everything in `EvalInlineTable` rule. The problem here is that I think we will want at least to check whether query is correct in analysis phase (i.e. check casting and shape) and then do final eval() in finishAnalysis phase, which is part of optimizer.
   
   In short, we should have short discussion about this. @cloud-fan , @MaxGekk, @beliefer, @dtenedor and @srielau - please provide feedback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1434080898


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -131,6 +131,21 @@ case class UnresolvedInlineTable(
   lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved))
 }
 
+/**
+ * An resolved inline table that holds all the expressions that were checked for
+ * the right shape and common data types.
+ * This is a preparation step for [[org.apache.spark.sql.catalyst.optimizer.EvalInlineTables]] which
+ * will produce a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]
+ * for this inline table.
+ *
+ * @param output list of column attributes
+ * @param rows expressions for the data rows
+ */
+case class ResolvedInlineTable(rows: Seq[Seq[Expression]], output: Seq[Attribute])

Review Comment:
   @dbatomic After review this PR again. I'm sorry for the above comment.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -131,6 +131,21 @@ case class UnresolvedInlineTable(
   lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved))
 }
 
+/**
+ * An resolved inline table that holds all the expressions that were checked for
+ * the right shape and common data types.
+ * This is a preparation step for [[org.apache.spark.sql.catalyst.optimizer.EvalInlineTables]] which
+ * will produce a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]
+ * for this inline table.
+ *
+ * @param output list of column attributes
+ * @param rows expressions for the data rows
+ */
+case class ResolvedInlineTable(rows: Seq[Seq[Expression]], output: Seq[Attribute])

Review Comment:
   ~~Shall we simplify `rows: Seq[Seq[Expression]]` to `exprs: Seq[Expression]`?~~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1434079720


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -105,26 +107,30 @@ object ResolveInlineTables extends Rule[LogicalPlan]
     val attributes = DataTypeUtils.toAttributes(StructType(fields))
     assert(fields.size == table.names.size)
 
-    val newRows: Seq[InternalRow] = table.rows.map { row =>
-      InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) =>
-        val targetType = fields(ci).dataType
-        try {
+    val castedRows: Seq[Seq[Expression]] = table.rows.map { row =>

Review Comment:
   I got it now. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1430978743


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -128,6 +128,19 @@ case class UnresolvedInlineTable(
   lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved))
 }
 
+/**
+ * An resolved inline table that holds all the expressions that were checked for
+ * the right shape and common data types.
+ * This is a preparation step for [[org.apache.spark.sql.catalyst.optimizer.EvalInlineTables]] which
+ * will produce a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]
+ * for this inline table.
+ *
+ * @param output list of column attributes
+ * @param rows expressions for the data rows
+ */
+case class ResolvedInlineTable(rows: Seq[Seq[Expression]], output: Seq[Attribute])
+  extends LeafNode { }

Review Comment:
   ```suggestion
     extends LeafNode
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on PR #44316:
URL: https://github.com/apache/spark/pull/44316#issuecomment-1853979737

   > I think we need a discussion.
   > 
   > ```
   > SELECT COUNT(DISTINCT ct) FROM VALUES
   > (CURRENT_TIMESTAMP()),
   > (CURRENT_TIMESTAMP()),
   > (CURRENT_TIMESTAMP()) as data(ct)
   > ```
   > 
   > The three call of CURRENT_TIMESTAMP() should have the same value?
   
   That's right. All the invocations of CURRENT_TIMESTAMP/CURRENT_DATE/NOW() should be replaced with single value which represents time of query arrival. We already do this for majority of scenarios (e.g. if time function is pretty much anywhere else in the query). The bug this PR tries to solve is that we don't do this for inline tables.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #44316:
URL: https://github.com/apache/spark/pull/44316#issuecomment-1855395706

   I checked in Postgres.
   `select CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP;`
   The output:
   ```
   current_timestamp            |current_timestamp            |current_timestamp            |
   -----------------------------+-----------------------------+-----------------------------+
   2023-12-14 16:27:19.663 +0800|2023-12-14 16:27:19.663 +0800|2023-12-14 16:27:19.663 +0800|
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1431241503


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -105,26 +107,40 @@ object ResolveInlineTables extends Rule[LogicalPlan]
     val attributes = DataTypeUtils.toAttributes(StructType(fields))
     assert(fields.size == table.names.size)
 
-    val newRows: Seq[InternalRow] = table.rows.map { row =>
-      InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) =>
-        val targetType = fields(ci).dataType
-        try {
+    val castedRows: Seq[Seq[Expression]] = table.rows.map { row =>
+      row.zipWithIndex.map {
+        case (e, ci) =>
+          val targetType = fields(ci).dataType
           val castedExpr = if (DataTypeUtils.sameType(e.dataType, targetType)) {
             e
           } else {
             cast(e, targetType)
           }
-          prepareForEval(castedExpr).eval()
-        } catch {
-          case NonFatal(ex) =>
-            table.failAnalysis(
-              errorClass = "INVALID_INLINE_TABLE.FAILED_SQL_EXPRESSION_EVALUATION",
-              messageParameters = Map("sqlExpr" -> toSQLExpr(e)),
-              cause = ex)
-        }
-      })
+          castedExpr
+      }
     }
 
-    LocalRelation(attributes, newRows)
+    val noCurrentLike: Boolean = castedRows.flatten.forall(!_.containsPattern(CURRENT_LIKE))
+    if (noCurrentLike) {
+      val evalRows: Seq[InternalRow] =

Review Comment:
   Yep, good idea. I also organized code a bit better and made it more testable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1433677281


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -105,26 +107,30 @@ object ResolveInlineTables extends Rule[LogicalPlan]
     val attributes = DataTypeUtils.toAttributes(StructType(fields))
     assert(fields.size == table.names.size)
 
-    val newRows: Seq[InternalRow] = table.rows.map { row =>
-      InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) =>
-        val targetType = fields(ci).dataType
-        try {
+    val castedRows: Seq[Seq[Expression]] = table.rows.map { row =>

Review Comment:
   it's a table (rows X columns)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #44316:
URL: https://github.com/apache/spark/pull/44316#issuecomment-1865816018

   The failed test is unrelated, thanks, merging to master/3.5!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "dbatomic (via GitHub)" <gi...@apache.org>.
dbatomic commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1430228124


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -128,6 +128,19 @@ case class UnresolvedInlineTable(
   lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved))
 }
 
+/**
+ * An temp resolved inline table that holds all the expressions that were checked for
+ * the right shape and common data types.
+ * This is a preparation step for [[org.apache.spark.sql.catalyst.optimizer.EvalInlineTables]] which
+ * will produce a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]
+ * for this inline table.
+ *
+ * @param output list of column attributes
+ * @param rows expressions for the data rows
+ */
+case class TempResolvedInlineTable(rows: Seq[Seq[Expression]], output: Seq[Attribute])

Review Comment:
   Yep, I also don't think it is a good idea to be that conservative here. Especially if we want to move in direction of making inline tables more flexible, as Serge pointed out.
   
   Please take a look at the latest iteration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1431387202


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala:
##########
@@ -70,6 +75,33 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Computes expressions in inline tables. This rule is supposed to be called at the very end
+ * of the analysis phase, given that all the expressions need to be fully resolved/replaced
+ * at this point.
+ */
+object EvalInlineTables extends Rule[LogicalPlan] with CastSupport {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.transformDownWithSubqueriesAndPruning(
+    AlwaysProcess.fn, ruleId) {
+    case table: ResolvedInlineTable =>
+      val newRows: Seq[InternalRow] =
+        table.rows.map { row => InternalRow.fromSeq(
+            row.map { e =>

Review Comment:
   nit: 2 spaces indentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1433743211


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -105,26 +107,30 @@ object ResolveInlineTables extends Rule[LogicalPlan]
     val attributes = DataTypeUtils.toAttributes(StructType(fields))
     assert(fields.size == table.names.size)
 
-    val newRows: Seq[InternalRow] = table.rows.map { row =>
-      InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) =>
-        val targetType = fields(ci).dataType
-        try {
+    val castedRows: Seq[Seq[Expression]] = table.rows.map { row =>

Review Comment:
   I know that. You means the X columns for each row is different?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1433352473


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -105,26 +107,30 @@ object ResolveInlineTables extends Rule[LogicalPlan]
     val attributes = DataTypeUtils.toAttributes(StructType(fields))
     assert(fields.size == table.names.size)
 
-    val newRows: Seq[InternalRow] = table.rows.map { row =>
-      InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) =>
-        val targetType = fields(ci).dataType
-        try {
+    val castedRows: Seq[Seq[Expression]] = table.rows.map { row =>

Review Comment:
   ~~It seems we only need the `Seq[Expression]` here.~~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46380][SQL]Replace current time/date prior to evaluating inline table expressions. [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44316:
URL: https://github.com/apache/spark/pull/44316#discussion_r1426482906


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala:
##########
@@ -33,12 +34,14 @@ import org.apache.spark.sql.types.{StructField, StructType}
  */
 object ResolveInlineTables extends Rule[LogicalPlan]
   with CastSupport with AliasHelper with EvalHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
-    AlwaysProcess.fn, ruleId) {
-    case table: UnresolvedInlineTable if table.expressionsResolved =>
-      validateInputDimension(table)
-      validateInputEvaluable(table)
-      convert(table)
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperatorsDown(p => ComputeCurrentTime(p)).resolveOperatorsWithPruning(

Review Comment:
   Oh good point. We should have a logical plan for inline tables to keep the expressions, and then replace it with `LocalRelation` after we run `ComputeCurrentTime`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org