You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/27 08:40:58 UTC

[spark] branch master updated: [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bd6b751a5b1 [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements
bd6b751a5b1 is described below

commit bd6b751a5b1c0b9b81039ca554d00e5ef841205d
Author: Allison Portis <al...@databricks.com>
AuthorDate: Mon Feb 27 16:40:38 2023 +0800

    [SPARK-41290][SQL] Support GENERATED ALWAYS AS expressions for columns in create/replace table statements
    
    ### What changes were proposed in this pull request?
    
    Enables creating generated columns in CREATE/REPLACE TABLE statements by specifying a generation expression for a column with GENERATED ALWAYS AS expr.
    
    For example the following will be supported:
    ```sql
    CREATE TABLE default.example (
        time TIMESTAMP,
        date DATE GENERATED ALWAYS AS (CAST(time AS DATE))
    )
    ```
    
    To be more specific this PR
    1. Adds parser support for `GENERATED ALWAYS AS expr` in create/replace table statements. Generation expressions are temporarily stored in the field's metadata, and then are parsed/verified in `DataSourceV2Strategy` and used to instantiate v2 [Column](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java).
    4. Adds `TableCatalog::capabilities()` and `TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS` This will be used to determine whether to allow specifying generation expressions or whether to throw an exception.
    
    ### Why are the changes needed?
    
    `GENERATED ALWAYS AS` is SQL standard. These changes will allow defining generated columns in create/replace table statements in Spark SQL.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Using `GENERATED ALWAYS AS expr` in CREATE/REPLACE table statements will no longer throw a parsing error. When used with a supporting table catalog the query should progress, when used with a nonsupporting catalog there will be an analysis exception.
    
    Previous behavior:
    ```
    spark-sql> CREATE TABLE default.example (
             >     time TIMESTAMP,
             >     date DATE GENERATED ALWAYS AS (CAST(time AS DATE))
             > )
             > ;
    Error in query:
    Syntax error at or near 'GENERATED'(line 3, pos 14)
    
    == SQL ==
    CREATE TABLE default.example (
        time TIMESTAMP,
        date DATE GENERATED ALWAYS AS (CAST(time AS DATE))
    --------------^^^
    )
    ```
    
    New behavior:
    ```
    AnalysisException: [UNSUPPORTED_FEATURE.TABLE_OPERATION] The feature is not supported: Table `my_tab` does not
    support creating generated columns with GENERATED ALWAYS AS expressions. Please check the current catalog and
    namespace to make sure the qualified table name is expected, and also check the catalog implementation which is
    configured by "spark.sql.catalog".
    ```
    
    ### How was this patch tested?
    
    Adds unit tests
    
    Closes #38823 from allisonport-db/parser-support.
    
    Authored-by: Allison Portis <al...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 core/src/main/resources/error/error-classes.json   |  10 +
 docs/sql-ref-ansi-compliance.md                    |   2 +
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4      |   2 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |   9 +
 .../apache/spark/sql/connector/catalog/Column.java |  35 +++-
 .../spark/sql/connector/catalog/TableCatalog.java  |   7 +
 .../connector/catalog/TableCatalogCapability.java  |  48 +++++
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  37 +++-
 .../spark/sql/catalyst/util/GeneratedColumn.scala  | 202 +++++++++++++++++++
 .../sql/connector/catalog/CatalogV2Util.scala      |  59 ++++--
 .../spark/sql/errors/QueryCompilationErrors.scala  |  10 +
 .../spark/sql/internal/connector/ColumnImpl.scala  |   1 +
 .../spark/sql/catalyst/parser/DDLParserSuite.scala |  47 ++++-
 .../connector/catalog/InMemoryTableCatalog.scala   |   5 +
 .../execution/datasources/DataSourceStrategy.scala |   9 +-
 .../datasources/v2/DataSourceV2Strategy.scala      |   8 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 222 +++++++++++++++++++++
 .../spark/sql/execution/command/DDLSuite.scala     |  11 +
 18 files changed, 694 insertions(+), 30 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 8910ca86de4..408c97acaa3 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -571,6 +571,11 @@
     ],
     "sqlState" : "42809"
   },
+  "GENERATED_COLUMN_WITH_DEFAULT_VALUE" : {
+    "message" : [
+      "A column cannot have both a default value and a generation expression but column <colName> has default value: (<defaultValue>) and generation expression: (<genExpr>)."
+    ]
+  },
   "GRAPHITE_SINK_INVALID_PROTOCOL" : {
     "message" : [
       "Invalid Graphite protocol: <protocol>."
@@ -1607,6 +1612,11 @@
     },
     "sqlState" : "0A000"
   },
+  "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN" : {
+    "message" : [
+      "Cannot create generated column <fieldName> with generation expression <expressionStr> because <reason>."
+    ]
+  },
   "UNSUPPORTED_EXPR_FOR_OPERATOR" : {
     "message" : [
       "A query operator contains one or more unsupported expressions. Consider to rewrite it to avoid window functions, aggregate functions, and generator functions in the WHERE clause.",
diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index 1501e14c604..4124e958e39 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -353,6 +353,7 @@ Below is a list of all the keywords in Spark SQL.
 |AFTER|non-reserved|non-reserved|non-reserved|
 |ALL|reserved|non-reserved|reserved|
 |ALTER|non-reserved|non-reserved|reserved|
+|ALWAYS|non-reserved|non-reserved|non-reserved|
 |ANALYZE|non-reserved|non-reserved|non-reserved|
 |AND|reserved|non-reserved|reserved|
 |ANTI|non-reserved|strict-non-reserved|non-reserved|
@@ -451,6 +452,7 @@ Below is a list of all the keywords in Spark SQL.
 |FULL|reserved|strict-non-reserved|reserved|
 |FUNCTION|non-reserved|non-reserved|reserved|
 |FUNCTIONS|non-reserved|non-reserved|non-reserved|
+|GENERATED|non-reserved|non-reserved|non-reserved|
 |GLOBAL|non-reserved|non-reserved|reserved|
 |GRANT|reserved|non-reserved|reserved|
 |GROUP|reserved|non-reserved|reserved|
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index 38f52901aa2..6d0862290cf 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -91,6 +91,7 @@ ADD: 'ADD';
 AFTER: 'AFTER';
 ALL: 'ALL';
 ALTER: 'ALTER';
+ALWAYS: 'ALWAYS';
 ANALYZE: 'ANALYZE';
 AND: 'AND';
 ANTI: 'ANTI';
@@ -189,6 +190,7 @@ FROM: 'FROM';
 FULL: 'FULL';
 FUNCTION: 'FUNCTION';
 FUNCTIONS: 'FUNCTIONS';
+GENERATED: 'GENERATED';
 GLOBAL: 'GLOBAL';
 GRANT: 'GRANT';
 GROUP: 'GROUP';
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 7c073411188..aa5f538bbf6 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -1035,9 +1035,14 @@ createOrReplaceTableColType
 colDefinitionOption
     : NOT NULL
     | defaultExpression
+    | generationExpression
     | commentSpec
     ;
 
+generationExpression
+    : GENERATED ALWAYS AS LEFT_PAREN expression RIGHT_PAREN
+    ;
+
 complexColTypeList
     : complexColType (COMMA complexColType)*
     ;
@@ -1183,6 +1188,7 @@ ansiNonReserved
     : ADD
     | AFTER
     | ALTER
+    | ALWAYS
     | ANALYZE
     | ANTI
     | ANY_VALUE
@@ -1252,6 +1258,7 @@ ansiNonReserved
     | FORMATTED
     | FUNCTION
     | FUNCTIONS
+    | GENERATED
     | GLOBAL
     | GROUPING
     | HOUR
@@ -1441,6 +1448,7 @@ nonReserved
     | AFTER
     | ALL
     | ALTER
+    | ALWAYS
     | ANALYZE
     | AND
     | ANY
@@ -1535,6 +1543,7 @@ nonReserved
     | FROM
     | FUNCTION
     | FUNCTIONS
+    | GENERATED
     | GLOBAL
     | GRANT
     | GROUP
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
index d2c8f25e739..b191438dbc3 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Column.java
@@ -33,6 +33,8 @@ import org.apache.spark.sql.types.DataType;
  * {@link TableCatalog#createTable(Identifier, Column[], Transform[], Map)}, and report it in
  * {@link Table#columns()} by calling the static {@code create} functions of this interface to
  * create it.
+ * <p>
+ * A column cannot have both a default value and a generation expression.
  */
 @Evolving
 public interface Column {
@@ -42,7 +44,16 @@ public interface Column {
   }
 
   static Column create(String name, DataType dataType, boolean nullable) {
-    return create(name, dataType, nullable, null, null, null);
+    return create(name, dataType, nullable, null, null);
+  }
+
+  static Column create(
+      String name,
+      DataType dataType,
+      boolean nullable,
+      String comment,
+      String metadataInJSON) {
+    return new ColumnImpl(name, dataType, nullable, comment, null, null, metadataInJSON);
   }
 
   static Column create(
@@ -52,7 +63,18 @@ public interface Column {
       String comment,
       ColumnDefaultValue defaultValue,
       String metadataInJSON) {
-    return new ColumnImpl(name, dataType, nullable, comment, defaultValue, metadataInJSON);
+    return new ColumnImpl(name, dataType, nullable, comment, defaultValue, null, metadataInJSON);
+  }
+
+  static Column create(
+      String name,
+      DataType dataType,
+      boolean nullable,
+      String comment,
+      String generationExpression,
+      String metadataInJSON) {
+    return new ColumnImpl(name, dataType, nullable, comment, null,
+            generationExpression, metadataInJSON);
   }
 
   /**
@@ -82,6 +104,15 @@ public interface Column {
   @Nullable
   ColumnDefaultValue defaultValue();
 
+  /**
+   * Returns the generation expression of this table column. Null means no generation expression.
+   * <p>
+   * The generation expression is stored as spark SQL dialect. It is up to the data source to verify
+   * expression compatibility and reject writes as necessary.
+   */
+  @Nullable
+  String generationExpression();
+
   /**
    * Returns the column metadata in JSON format.
    */
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index 82622d65205..eb442ad38bd 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -25,7 +25,9 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
 import org.apache.spark.sql.errors.QueryCompilationErrors;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Catalog methods for working with Tables.
@@ -78,6 +80,11 @@ public interface TableCatalog extends CatalogPlugin {
    */
   String OPTION_PREFIX = "option.";
 
+  /**
+   * @return the set of capabilities for this TableCatalog
+   */
+  default Set<TableCatalogCapability> capabilities() { return Collections.emptySet(); }
+
   /**
    * List the tables in a namespace from the catalog.
    * <p>
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
new file mode 100644
index 00000000000..84a2a0f7648
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Capabilities that can be provided by a {@link TableCatalog} implementation.
+ * <p>
+ * TableCatalogs use {@link TableCatalog#capabilities()} to return a set of capabilities. Each
+ * capability signals to Spark that the catalog supports a feature identified by the capability.
+ * For example, returning {@link #SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS} allows Spark to
+ * accept {@code GENERATED ALWAYS AS} expressions in {@code CREATE TABLE} statements.
+ *
+ * @since 3.4.0
+ */
+@Evolving
+public enum TableCatalogCapability {
+
+  /**
+  * Signals that the TableCatalog supports defining generated columns upon table creation in SQL.
+  * <p>
+  * Without this capability, any create/replace table statements with a generated column defined
+  * in the table schema will throw an exception during analysis.
+  * <p>
+  * A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS (expr)}
+  * <p>
+  * Generation expression are included in the column definition for APIs like
+  * {@link TableCatalog#createTable}.
+  * See {@link Column#generationExpression()}.
+  */
+  SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 8957794ad95..1c5eac4ce17 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
-import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, IntervalUtils, ResolveDefaultColumns}
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, GeneratedColumn, IntervalUtils, ResolveDefaultColumns}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTimestamp, stringToTimestampWithoutTimeZone}
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsNamespaces, TableCatalog}
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -3002,6 +3002,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     // Check that no duplicates exist among any CREATE TABLE column options specified.
     var nullable = true
     var defaultExpression: Option[DefaultExpressionContext] = None
+    var generationExpression: Option[GenerationExpressionContext] = None
     var commentSpec: Option[CommentSpecContext] = None
     ctx.colDefinitionOption().asScala.foreach { option =>
       if (option.NULL != null) {
@@ -3018,6 +3019,13 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
         }
         defaultExpression = Some(expr)
       }
+      Option(option.generationExpression()).foreach { expr =>
+        if (generationExpression.isDefined) {
+          throw QueryParsingErrors.duplicateCreateTableColumnOption(
+            option, colName.getText, "GENERATED ALWAYS AS")
+        }
+        generationExpression = Some(expr)
+      }
       Option(option.commentSpec()).foreach { spec =>
         if (commentSpec.isDefined) {
           throw QueryParsingErrors.duplicateCreateTableColumnOption(
@@ -3042,6 +3050,11 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
         throw QueryParsingErrors.defaultColumnNotEnabledError(ctx)
       }
     }
+    // Add the 'GENERATED ALWAYS AS expression' clause in the column definition, if any, to the
+    // column metadata.
+    generationExpression.map(visitGenerationExpression).foreach { field =>
+      builder.putString(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY, field)
+    }
 
     val name: String = colName.getText
 
@@ -3100,11 +3113,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     string(visitStringLit(ctx.stringLit))
   }
 
-  /**
-   * Create a default string.
-   */
-  override def visitDefaultExpression(ctx: DefaultExpressionContext): String = withOrigin(ctx) {
-    val exprCtx = ctx.expression()
+  private def verifyAndGetExpression(exprCtx: ExpressionContext): String = {
     // Make sure it can be converted to Catalyst expressions.
     expression(exprCtx)
     // Extract the raw expression text so that we can save the user provided text. We don't
@@ -3116,6 +3125,22 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     exprCtx.getStart.getInputStream.getText(new Interval(start, end))
   }
 
+  /**
+   * Create a default string.
+   */
+  override def visitDefaultExpression(ctx: DefaultExpressionContext): String =
+    withOrigin(ctx) {
+      verifyAndGetExpression(ctx.expression())
+    }
+
+  /**
+   * Create a generation expression string.
+   */
+  override def visitGenerationExpression(ctx: GenerationExpressionContext): String =
+    withOrigin(ctx) {
+      verifyAndGetExpression(ctx.expression())
+    }
+
   /**
    * Create an optional comment string.
    */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
new file mode 100644
index 00000000000..6ff5df98d3c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Expression}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
+import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
+import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog, TableCatalogCapability}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+
+/**
+ * This object contains utility methods and values for Generated Columns
+ */
+object GeneratedColumn {
+
+  /**
+   * The metadata key for saving a generation expression in a generated column's metadata. This is
+   * only used internally and connectors should access generation expressions from the V2 columns.
+   */
+  val GENERATION_EXPRESSION_METADATA_KEY = "GENERATION_EXPRESSION"
+
+  /** Parser for parsing generation expression SQL strings */
+  private lazy val parser = new CatalystSqlParser()
+
+  /**
+   * Whether the given `field` is a generated column
+   */
+  def isGeneratedColumn(field: StructField): Boolean = {
+    field.metadata.contains(GENERATION_EXPRESSION_METADATA_KEY)
+  }
+
+  /**
+   * Returns the generation expression stored in the column metadata if it exists
+   */
+  def getGenerationExpression(field: StructField): Option[String] = {
+    if (isGeneratedColumn(field)) {
+      Some(field.metadata.getString(GENERATION_EXPRESSION_METADATA_KEY))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Whether the `schema` has one or more generated columns
+   */
+  def hasGeneratedColumns(schema: StructType): Boolean = {
+    schema.exists(isGeneratedColumn)
+  }
+
+  /**
+   * Parse and analyze `expressionStr` and perform verification. This means:
+   * - The expression cannot reference itself
+   * - The expression cannot reference other generated columns
+   * - No user-defined expressions
+   * - The expression must be deterministic
+   * - The expression data type can be safely up-cast to the destination column data type
+   * - No subquery expressions
+   *
+   * Throws an [[AnalysisException]] if the expression cannot be converted or is an invalid
+   * generation expression according to the above rules.
+   */
+  private def analyzeAndVerifyExpression(
+      expressionStr: String,
+      fieldName: String,
+      dataType: DataType,
+      schema: StructType,
+      statementType: String): Unit = {
+    def unsupportedExpressionError(reason: String): AnalysisException = {
+      new AnalysisException(
+        errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
+        messageParameters = Map(
+          "fieldName" -> fieldName,
+          "expressionStr" -> expressionStr,
+          "reason" -> reason))
+    }
+
+    // Parse the expression string
+    val parsed: Expression = try {
+      parser.parseExpression(expressionStr)
+    } catch {
+      case ex: ParseException =>
+        // Shouldn't be possible since we check that the expression is a valid catalyst expression
+        // during parsing
+        throw SparkException.internalError(
+          s"Failed to execute $statementType command because the column $fieldName has " +
+            s"generation expression $expressionStr which fails to parse as a valid expression:" +
+            s"\n${ex.getMessage}")
+    }
+    // Don't allow subquery expressions
+    if (parsed.containsPattern(PLAN_EXPRESSION)) {
+      throw unsupportedExpressionError("subquery expressions are not allowed for generated columns")
+    }
+    // Analyze the parsed result
+    val allowedBaseColumns = schema
+      .filterNot(_.name == fieldName) // Can't reference itself
+      .filterNot(isGeneratedColumn) // Can't reference other generated columns
+    val relation = new LocalRelation(StructType(allowedBaseColumns).toAttributes)
+    val plan = try {
+      val analyzer: Analyzer = GeneratedColumnAnalyzer
+      val analyzed = analyzer.execute(Project(Seq(Alias(parsed, fieldName)()), relation))
+      analyzer.checkAnalysis(analyzed)
+      analyzed
+    } catch {
+      case ex: AnalysisException =>
+        // Improve error message if possible
+        if (ex.getErrorClass == "UNRESOLVED_COLUMN.WITH_SUGGESTION") {
+          ex.messageParameters.get("objectName").foreach { unresolvedCol =>
+            val resolver = SQLConf.get.resolver
+            // Whether `col` = `unresolvedCol` taking into account case-sensitivity
+            def isUnresolvedCol(col: String) =
+              resolver(unresolvedCol, QueryCompilationErrors.toSQLId(col))
+            // Check whether the unresolved column is this column
+            if (isUnresolvedCol(fieldName)) {
+              throw unsupportedExpressionError("generation expression cannot reference itself")
+            }
+            // Check whether the unresolved column is another generated column in the schema
+            if (schema.exists(col => isGeneratedColumn(col) && isUnresolvedCol(col.name))) {
+              throw unsupportedExpressionError(
+                "generation expression cannot reference another generated column")
+            }
+          }
+        }
+        if (ex.getErrorClass == "UNRESOLVED_ROUTINE") {
+          // Cannot resolve function using built-in catalog
+          ex.messageParameters.get("routineName").foreach { fnName =>
+            throw unsupportedExpressionError(s"failed to resolve $fnName to a built-in function")
+          }
+        }
+        throw ex
+    }
+    val analyzed = plan.collectFirst {
+      case Project(Seq(a: Alias), _: LocalRelation) => a.child
+    }.get
+    if (!analyzed.deterministic) {
+      throw unsupportedExpressionError("generation expression is not deterministic")
+    }
+    if (!Cast.canUpCast(analyzed.dataType, dataType)) {
+      throw unsupportedExpressionError(
+        s"generation expression data type ${analyzed.dataType.simpleString} " +
+        s"is incompatible with column data type ${dataType.simpleString}")
+    }
+  }
+
+  /**
+   * For any generated columns in `schema`, parse, analyze and verify the generation expression.
+   */
+  private def verifyGeneratedColumns(schema: StructType, statementType: String): Unit = {
+    schema.foreach { field =>
+      getGenerationExpression(field).foreach { expressionStr =>
+        analyzeAndVerifyExpression(expressionStr, field.name, field.dataType, schema, statementType)
+      }
+    }
+  }
+
+  /**
+   * If `schema` contains any generated columns:
+   * 1) Check whether the table catalog supports generated columns. Otherwise throw an error.
+   * 2) Parse, analyze and verify the generation expressions for any generated columns.
+   */
+  def validateGeneratedColumns(
+      schema: StructType,
+      catalog: TableCatalog,
+      ident: Seq[String],
+      statementType: String): Unit = {
+    if (hasGeneratedColumns(schema)) {
+      if (!catalog.capabilities().contains(
+        TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS)) {
+        throw QueryCompilationErrors.generatedColumnsUnsupported(ident)
+      }
+      GeneratedColumn.verifyGeneratedColumns(schema, statementType)
+    }
+  }
+}
+
+/**
+ * Analyzer for processing generated column expressions using built-in functions only.
+ */
+object GeneratedColumnAnalyzer extends Analyzer(
+  new CatalogManager(BuiltInFunctionCatalog, BuiltInFunctionCatalog.v1Catalog)) {
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 9b481356fa6..12a8db92363 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -22,9 +22,11 @@ import java.util.Collections
 
 import scala.collection.JavaConverters._
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
+import org.apache.spark.sql.catalyst.util.GeneratedColumn
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.connector.catalog.TableChange._
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
@@ -471,43 +473,62 @@ private[sql] object CatalogV2Util {
 
   /**
    * Converts a StructType to DS v2 columns, which decodes the StructField metadata to v2 column
-   * comment and default value. This is mainly used to generate DS v2 columns from table schema in
-   * DDL commands, so that Spark can pass DS v2 columns to DS v2 createTable and related APIs.
+   * comment and default value or generation expression. This is mainly used to generate DS v2
+   * columns from table schema in DDL commands, so that Spark can pass DS v2 columns to DS v2
+   * createTable and related APIs.
    */
   def structTypeToV2Columns(schema: StructType): Array[Column] = {
     schema.fields.map(structFieldToV2Column)
   }
 
   private def structFieldToV2Column(f: StructField): Column = {
-    def createV2Column(defaultValue: ColumnDefaultValue, metadata: Metadata): Column = {
-      val metadataJSON = if (metadata == Metadata.empty) {
+    def metadataAsJson(metadata: Metadata): String = {
+      if (metadata == Metadata.empty) {
         null
       } else {
         metadata.json
       }
-      Column.create(
-        f.name, f.dataType, f.nullable, f.getComment().orNull, defaultValue, metadataJSON)
     }
-    if (f.getCurrentDefaultValue().isDefined && f.getExistenceDefaultValue().isDefined) {
+    def metadataWithKeysRemoved(keys: Seq[String]): Metadata = {
+      keys.foldLeft(new MetadataBuilder().withMetadata(f.metadata)) {
+        (builder, key) => builder.remove(key)
+      }.build()
+    }
+
+    val isDefaultColumn = f.getCurrentDefaultValue().isDefined &&
+      f.getExistenceDefaultValue().isDefined
+    val isGeneratedColumn = GeneratedColumn.isGeneratedColumn(f)
+    if (isDefaultColumn && isGeneratedColumn) {
+      throw new AnalysisException(
+        errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE",
+        messageParameters = Map(
+          "colName" -> f.name,
+          "defaultValue" -> f.getCurrentDefaultValue().get,
+          "genExpr" -> GeneratedColumn.getGenerationExpression(f).get
+        )
+      )
+    }
+
+    if (isDefaultColumn) {
       val e = analyze(f, EXISTS_DEFAULT_COLUMN_METADATA_KEY)
       assert(e.resolved && e.foldable,
         "The existence default value must be a simple SQL string that is resolved and foldable, " +
           "but got: " + f.getExistenceDefaultValue().get)
       val defaultValue = new ColumnDefaultValue(
         f.getCurrentDefaultValue().get, LiteralValue(e.eval(), f.dataType))
-      val cleanedMetadata = new MetadataBuilder()
-        .withMetadata(f.metadata)
-        .remove("comment")
-        .remove(CURRENT_DEFAULT_COLUMN_METADATA_KEY)
-        .remove(EXISTS_DEFAULT_COLUMN_METADATA_KEY)
-        .build()
-      createV2Column(defaultValue, cleanedMetadata)
+      val cleanedMetadata = metadataWithKeysRemoved(
+        Seq("comment", CURRENT_DEFAULT_COLUMN_METADATA_KEY, EXISTS_DEFAULT_COLUMN_METADATA_KEY))
+      Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull, defaultValue,
+        metadataAsJson(cleanedMetadata))
+    } else if (isGeneratedColumn) {
+      val cleanedMetadata = metadataWithKeysRemoved(
+        Seq("comment", GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY))
+      Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull,
+        GeneratedColumn.getGenerationExpression(f).get, metadataAsJson(cleanedMetadata))
     } else {
-      val cleanedMetadata = new MetadataBuilder()
-        .withMetadata(f.metadata)
-        .remove("comment")
-        .build()
-      createV2Column(null, cleanedMetadata)
+      val cleanedMetadata = metadataWithKeysRemoved(Seq("comment"))
+      Column.create(f.name, f.dataType, f.nullable, f.getComment().orNull,
+        metadataAsJson(cleanedMetadata))
     }
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 556b3a62da3..1c257966aaf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3405,6 +3405,16 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
     }
   }
 
+  def generatedColumnsUnsupported(nameParts: Seq[String]): AnalysisException = {
+    new AnalysisException(
+      errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+      messageParameters = Map(
+        "tableName" -> toSQLId(nameParts),
+        "operation" -> "generated columns"
+      )
+    )
+  }
+
   def ambiguousLateralColumnAliasError(name: String, numOfMatches: Int): Throwable = {
     new AnalysisException(
       errorClass = "AMBIGUOUS_LATERAL_COLUMN_ALIAS",
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
index 5ab3f83eeae..2a67ffc4bbe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/ColumnImpl.scala
@@ -27,4 +27,5 @@ case class ColumnImpl(
     nullable: Boolean,
     comment: String,
     defaultValue: ColumnDefaultValue,
+    generationExpression: String,
     metadataInJSON: String) extends Column
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 0efbd75ad93..5196d19ffcd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SparkThrowable
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.{TableSpec => LogicalTableSpec, _}
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
+import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns}
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
 import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket
@@ -2717,4 +2717,49 @@ class DDLParserSuite extends AnalysisTest {
       context = ExpectedContext(
         fragment = "b STRING COMMENT \"abc\" NOT NULL COMMENT \"abc\"", start = 27, stop = 71))
   }
+
+  test("SPARK-41290: implement parser support for GENERATED ALWAYS AS columns in tables") {
+    val schemaWithGeneratedColumn = new StructType()
+      .add("a", IntegerType, true)
+      .add("b", IntegerType, false,
+        new MetadataBuilder()
+          .putString(GeneratedColumn.GENERATION_EXPRESSION_METADATA_KEY, "a+1")
+          .build())
+    comparePlans(parsePlan(
+      "CREATE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"),
+      CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn,
+        Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
+          Map.empty[String, String], None, None, None, false), false))
+    comparePlans(parsePlan(
+      "REPLACE TABLE my_tab(a INT, b INT NOT NULL GENERATED ALWAYS AS (a+1)) USING parquet"),
+      ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithGeneratedColumn,
+        Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
+          Map.empty[String, String], None, None, None, false), false))
+    // Two generation expressions
+    checkError(
+      exception = parseException("CREATE TABLE my_tab(a INT, " +
+          "b INT GENERATED ALWAYS AS (a + 1) GENERATED ALWAYS AS (a + 2)) USING PARQUET"),
+      errorClass = "CREATE_TABLE_COLUMN_OPTION_DUPLICATE",
+      parameters = Map("columnName" -> "b", "optionName" -> "GENERATED ALWAYS AS"),
+      context = ExpectedContext(
+        fragment = "b INT GENERATED ALWAYS AS (a + 1) GENERATED ALWAYS AS (a + 2)",
+        start = 27,
+        stop = 87
+      )
+    )
+    // Empty expression
+    checkError(
+      exception = parseException(
+        "CREATE TABLE my_tab(a INT, b INT GENERATED ALWAYS AS ()) USING PARQUET"),
+      errorClass = "PARSE_SYNTAX_ERROR",
+      parameters = Map("error" -> "')'", "hint" -> "")
+    )
+    // No parenthesis
+    checkError(
+      exception = parseException(
+        "CREATE TABLE my_tab(a INT, b INT GENERATED ALWAYS AS a + 1) USING PARQUET"),
+      errorClass = "PARSE_SYNTAX_ERROR",
+      parameters = Map("error" -> "'a'", "hint" -> ": missing '('")
+    )
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 50bea2b8d2f..e82f203742b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -170,6 +170,11 @@ class BasicInMemoryTableCatalog extends TableCatalog {
 }
 
 class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamespaces {
+
+  override def capabilities: java.util.Set[TableCatalogCapability] = {
+    Set(TableCatalogCapability.SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS).asJava
+  }
+
   protected def allNamespaces: Seq[Seq[String]] = {
     (tables.keySet.asScala.map(_.namespace.toSeq) ++ namespaces.keySet.asScala).toSeq.distinct
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 8b985e82963..74539b54117 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
-import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, V2ExpressionBuilder}
+import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns, V2ExpressionBuilder}
 import org.apache.spark.sql.connector.catalog.SupportsRead
 import org.apache.spark.sql.connector.catalog.TableCapability._
 import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, NullOrdering, SortDirection, SortOrder => V2SortOrder, SortValue}
@@ -136,6 +136,13 @@ case class DataSourceAnalysis(analyzer: Analyzer) extends Rule[LogicalPlan] {
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
           tableDesc.schema, tableDesc.provider, "CREATE TABLE", false)
+
+      if (GeneratedColumn.hasGeneratedColumns(newSchema)) {
+        throw QueryCompilationErrors.generatedColumnsUnsupported(
+          Seq(tableDesc.identifier.catalog.get, tableDesc.identifier.database.get,
+            tableDesc.identifier.table))
+      }
+
       val newTableDesc = tableDesc.copy(schema = newSchema)
       CreateDataSourceTableCommand(newTableDesc, ignoreIfExists = mode == SaveMode.Ignore)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index b45de06371c..71ffe65b42a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.{toPrettySQL, ResolveDefaultColumns, V2ExpressionBuilder}
+import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, ResolveDefaultColumns, V2ExpressionBuilder}
 import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog, TruncatableTable}
 import org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns
 import org.apache.spark.sql.connector.catalog.index.SupportsIndex
@@ -178,6 +178,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
           schema, tableSpec.provider, "CREATE TABLE", false)
+      GeneratedColumn.validateGeneratedColumns(
+        newSchema, catalog.asTableCatalog, ident.asMultipartIdentifier, "CREATE TABLE")
+
       CreateTableExec(catalog.asTableCatalog, ident, structTypeToV2Columns(newSchema),
         partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
 
@@ -201,6 +204,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
           schema, tableSpec.provider, "CREATE TABLE", false)
+      GeneratedColumn.validateGeneratedColumns(
+        newSchema, catalog.asTableCatalog, ident.asMultipartIdentifier, "CREATE TABLE")
+
       val v2Columns = structTypeToV2Columns(newSchema)
       catalog match {
         case staging: StagingTableCatalog =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index c4dabaec888..85984f1b2a8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1422,6 +1422,228 @@ class DataSourceV2SQLSuiteV1Filter
     }
   }
 
+  test("SPARK-41290: Generated columns only allowed with TableCatalogs that " +
+    "SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS") {
+    val tblName = "my_tab"
+    val tableDefinition =
+      s"$tblName(eventDate DATE, eventYear INT GENERATED ALWAYS AS (year(eventDate)))"
+    for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) {
+      // InMemoryTableCatalog.capabilities() = {SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS}
+      withTable(s"testcat.$tblName") {
+        if (statement == "REPLACE TABLE") {
+          sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo")
+        }
+        // Can create table with a generated column
+        sql(s"$statement testcat.$tableDefinition USING foo")
+        assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName)))
+      }
+      // BasicInMemoryTableCatalog.capabilities() = {}
+      withSQLConf("spark.sql.catalog.dummy" -> classOf[BasicInMemoryTableCatalog].getName) {
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("USE dummy")
+            sql(s"$statement dummy.$tableDefinition USING foo")
+          },
+          errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+          parameters = Map(
+            "tableName" -> "`my_tab`",
+            "operation" -> "generated columns"
+          )
+        )
+      }
+    }
+  }
+
+  test("SPARK-41290: Column cannot have both a generation expression and a default value") {
+    val tblName = "my_tab"
+    val tableDefinition =
+      s"$tblName(eventDate DATE, eventYear INT GENERATED ALWAYS AS (year(eventDate)) DEFAULT 0)"
+    withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> "foo") {
+      for (statement <- Seq("CREATE TABLE", "REPLACE TABLE")) {
+        withTable(s"testcat.$tblName") {
+          if (statement == "REPLACE TABLE") {
+            sql(s"CREATE TABLE testcat.$tblName(a INT) USING foo")
+          }
+          checkError(
+            exception = intercept[AnalysisException] {
+              sql(s"$statement testcat.$tableDefinition USING foo")
+            },
+            errorClass = "GENERATED_COLUMN_WITH_DEFAULT_VALUE",
+            parameters = Map(
+              "colName" -> "eventYear",
+              "defaultValue" -> "0",
+              "genExpr" -> "year(eventDate)")
+          )
+        }
+      }
+    }
+  }
+
+  test("SPARK-41290: Generated column expression must be valid generation expression") {
+    val tblName = "my_tab"
+    def checkUnsupportedGenerationExpression(
+        expr: String,
+        expectedReason: String,
+        genColType: String = "INT",
+        customTableDef: Option[String] = None): Unit = {
+      val tableDef =
+        s"CREATE TABLE testcat.$tblName(a INT, b $genColType GENERATED ALWAYS AS ($expr)) USING foo"
+      withTable(s"testcat.$tblName") {
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql(customTableDef.getOrElse(tableDef))
+          },
+          errorClass = "UNSUPPORTED_EXPRESSION_GENERATED_COLUMN",
+          parameters = Map(
+            "fieldName" -> "b",
+            "expressionStr" -> expr,
+            "reason" -> expectedReason)
+        )
+      }
+    }
+
+    // Expression cannot be resolved since it doesn't exist
+    checkUnsupportedGenerationExpression(
+      "not_a_function(a)",
+      "failed to resolve `not_a_function` to a built-in function"
+    )
+
+    // Expression cannot be resolved since it's not a built-in function
+    spark.udf.register("timesTwo", (x: Int) => x * 2)
+    checkUnsupportedGenerationExpression(
+      "timesTwo(a)",
+      "failed to resolve `timesTwo` to a built-in function"
+    )
+
+    // Generated column can't reference itself
+    checkUnsupportedGenerationExpression(
+      "b + 1",
+      "generation expression cannot reference itself"
+    )
+    // Obeys case sensitivity when intercepting the error message
+    // Intercepts when case-insensitive
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      checkUnsupportedGenerationExpression(
+        "B + 1",
+        "generation expression cannot reference itself"
+      )
+    }
+    // Doesn't intercept when case-sensitive
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+      withTable(s"testcat.$tblName") {
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql(s"CREATE TABLE testcat.$tblName(a INT, " +
+              "b INT GENERATED ALWAYS AS (B + 1)) USING foo")
+          },
+          errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+          parameters = Map("objectName" -> "`B`", "proposal" -> "`a`"),
+          context = ExpectedContext(fragment = "B", start = 0, stop = 0)
+        )
+      }
+    }
+    // Respects case sensitivity when resolving
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+      withTable(s"testcat.$tblName") {
+        sql(s"CREATE TABLE testcat.$tblName(" +
+          "a INT, b INT GENERATED ALWAYS AS (B + 1), B INT) USING foo")
+        assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName)))
+      }
+    }
+
+    // Generated column can't reference other generated columns
+    checkUnsupportedGenerationExpression(
+      "c + 1",
+      "generation expression cannot reference another generated column",
+      customTableDef = Some(
+        s"CREATE TABLE testcat.$tblName(a INT, " +
+          "b INT GENERATED ALWAYS AS (c + 1), c INT GENERATED ALWAYS AS (a + 1)) USING foo"
+      )
+    )
+    // Respects case-insensitivity
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      checkUnsupportedGenerationExpression(
+        "C + 1",
+        "generation expression cannot reference another generated column",
+        customTableDef = Some(
+          s"CREATE TABLE testcat.$tblName(a INT, " +
+            "b INT GENERATED ALWAYS AS (C + 1), c INT GENERATED ALWAYS AS (a + 1)) USING foo"
+        )
+      )
+      checkUnsupportedGenerationExpression(
+        "c + 1",
+        "generation expression cannot reference another generated column",
+        customTableDef = Some(
+          s"CREATE TABLE testcat.$tblName(a INT, " +
+            "b INT GENERATED ALWAYS AS (c + 1), C INT GENERATED ALWAYS AS (a + 1)) USING foo"
+        )
+      )
+    }
+    // Respects case sensitivity when resolving
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+      withTable(s"testcat.$tblName") {
+        sql(s"CREATE TABLE testcat.$tblName(" +
+          "a INT, A INT GENERATED ALWAYS AS (a + 1), b INT GENERATED ALWAYS AS (a + 1)) USING foo")
+        assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName)))
+      }
+    }
+
+    // Generated column can't reference non-existent column
+    withTable(s"testcat.$tblName") {
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"CREATE TABLE testcat.$tblName(a INT, b INT GENERATED ALWAYS AS (c + 1)) USING foo")
+        },
+        errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+        parameters = Map("objectName" -> "`c`", "proposal" -> "`a`"),
+        context = ExpectedContext(fragment = "c", start = 0, stop = 0)
+      )
+    }
+
+    // Expression must be deterministic
+    checkUnsupportedGenerationExpression(
+      "rand()",
+      "generation expression is not deterministic"
+    )
+
+    // Data type is incompatible
+    checkUnsupportedGenerationExpression(
+      "a + 1",
+      "generation expression data type int is incompatible with column data type boolean",
+      "BOOLEAN"
+    )
+    // But we allow valid up-casts
+    withTable(s"testcat.$tblName") {
+      sql(s"CREATE TABLE testcat.$tblName(a INT, b LONG GENERATED ALWAYS AS (a + 1)) USING foo")
+      assert(catalog("testcat").asTableCatalog.tableExists(Identifier.of(Array(), tblName)))
+    }
+
+    // No subquery expressions
+    checkUnsupportedGenerationExpression(
+      "(SELECT 1)",
+      "subquery expressions are not allowed for generated columns"
+    )
+    checkUnsupportedGenerationExpression(
+      "(SELECT (SELECT 2) + 1)", // nested
+      "subquery expressions are not allowed for generated columns"
+    )
+    checkUnsupportedGenerationExpression(
+      "(SELECT 1) + a", // refers to another column
+      "subquery expressions are not allowed for generated columns"
+    )
+    withTable("other") {
+      sql("create table other(x INT) using parquet")
+      checkUnsupportedGenerationExpression(
+        "(select min(x) from other)", // refers to another table
+        "subquery expressions are not allowed for generated columns"
+      )
+    }
+    checkUnsupportedGenerationExpression(
+      "(select min(x) from faketable)", // refers to a non-existent table
+      "subquery expressions are not allowed for generated columns"
+    )
+  }
+
   test("ShowCurrentNamespace: basic tests") {
     def testShowCurrentNamespace(expectedCatalogName: String, expectedNamespace: String): Unit = {
       val schema = new StructType()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 3d88d4f7ab9..4f3f993d7de 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2181,6 +2181,17 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
       assert(spark.sessionState.catalog.isRegisteredFunction(rand))
     }
   }
+
+  test("SPARK-41290: No generated columns with V1") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql(s"create table t(a int, b int generated always as (a + 1)) using parquet")
+      },
+      errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+      parameters = Map("tableName" -> "`spark_catalog`.`default`.`t`",
+        "operation" -> "generated columns")
+    )
+  }
 }
 
 object FakeLocalFsFileSystem {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org