You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/03/24 14:39:54 UTC

[GitHub] [spark] gengliangwang commented on a change in pull request #35855: [SPARK-38336][SQL] Support DEFAULT column values in CREATE/REPLACE TABLE statements

gengliangwang commented on a change in pull request #35855:
URL: https://github.com/apache/spark/pull/35855#discussion_r834350327



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This object contains fields to help process DEFAULT columns.
+ */
+object ResolveDefaultColumns {
+  // This column metadata indicates the default value associated with a particular table column that
+  // is in effect at any given time. Its value begins at the time of the initial CREATE/REPLACE
+  // TABLE statement with DEFAULT column definition(s), if any. It then changes whenever an ALTER
+  // TABLE statement SETs the DEFAULT. The intent is for this "current default" to be used by
+  // UPDATE, INSERT and MERGE, which evaluate each default expression for each row.
+  val CURRENT_DEFAULT_COLUMN_METADATA_KEY = "CURRENT_DEFAULT"
+  // This column metadata represents the default value for all existing rows in a table after a
+  // column has been added. This value is determined at time of CREATE TABLE, REPLACE TABLE, or
+  // ALTER TABLE ADD COLUMN, and never changes thereafter. The intent is for this "exist default"
+  // to be used by any scan when the columns in the source row are missing data. For example,
+  // consider the following sequence:
+  // CREATE TABLE t (c1 INT)
+  // INSERT INTO t VALUES (42)
+  // ALTER TABLE t ADD COLUMNS (c2 INT DEFAULT 43)
+  // SELECT c1, c2 FROM t
+  // In this case, the final query is expected to return 42, 43. The ALTER TABLE ADD COLUMNS command
+  // executed after there was already data in the table, so in order to enforce this invariant,
+  // we need either (1) an expensive backfill of value 43 at column c2 into all previous rows, or
+  // (2) indicate to each data source that selected columns missing data are to generate the
+  // corresponding DEFAULT value instead. We choose option (2) for efficiency, and represent this
+  // value as the text representation of a folded constant in the "EXISTS_DEFAULT" column metadata.
+  val EXISTS_DEFAULT_COLUMN_METADATA_KEY = "EXISTS_DEFAULT"
+  // Name of attributes representing explicit references to the value stored in the above
+  // CURRENT_DEFAULT_COLUMN_METADATA.
+  val CURRENT_DEFAULT_COLUMN_NAME = "DEFAULT"

Review comment:
       nit: this is not used

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This object contains fields to help process DEFAULT columns.
+ */
+object ResolveDefaultColumns {
+  // This column metadata indicates the default value associated with a particular table column that
+  // is in effect at any given time. Its value begins at the time of the initial CREATE/REPLACE
+  // TABLE statement with DEFAULT column definition(s), if any. It then changes whenever an ALTER
+  // TABLE statement SETs the DEFAULT. The intent is for this "current default" to be used by
+  // UPDATE, INSERT and MERGE, which evaluate each default expression for each row.
+  val CURRENT_DEFAULT_COLUMN_METADATA_KEY = "CURRENT_DEFAULT"
+  // This column metadata represents the default value for all existing rows in a table after a
+  // column has been added. This value is determined at time of CREATE TABLE, REPLACE TABLE, or
+  // ALTER TABLE ADD COLUMN, and never changes thereafter. The intent is for this "exist default"
+  // to be used by any scan when the columns in the source row are missing data. For example,
+  // consider the following sequence:
+  // CREATE TABLE t (c1 INT)
+  // INSERT INTO t VALUES (42)
+  // ALTER TABLE t ADD COLUMNS (c2 INT DEFAULT 43)
+  // SELECT c1, c2 FROM t
+  // In this case, the final query is expected to return 42, 43. The ALTER TABLE ADD COLUMNS command
+  // executed after there was already data in the table, so in order to enforce this invariant,
+  // we need either (1) an expensive backfill of value 43 at column c2 into all previous rows, or
+  // (2) indicate to each data source that selected columns missing data are to generate the
+  // corresponding DEFAULT value instead. We choose option (2) for efficiency, and represent this
+  // value as the text representation of a folded constant in the "EXISTS_DEFAULT" column metadata.
+  val EXISTS_DEFAULT_COLUMN_METADATA_KEY = "EXISTS_DEFAULT"
+  // Name of attributes representing explicit references to the value stored in the above
+  // CURRENT_DEFAULT_COLUMN_METADATA.
+  val CURRENT_DEFAULT_COLUMN_NAME = "DEFAULT"
+
+  /**
+   * Finds "current default" expressions in CREATE/REPLACE TABLE columns and constant-folds them.
+   *
+   * The results are stored in the "exists default" metadata of the same columns. For example, in
+   * the event of this statement:
+   *
+   * CREATE TABLE T(a INT, b INT DEFAULT 5 + 5)
+   *
+   * This method constant-folds the "current default" value, stored in the CURRENT_DEFAULT metadata
+   * of the "b" column, to "10", storing the result in the "exists default" value within the
+   * EXISTS_DEFAULT metadata of that same column. Meanwhile the "current default" metadata of this
+   * "b" column retains its original value of "5 + 5".
+   *
+   * @param tableSchema represents the names and types of the columns of the statement to process.
+   * @param statementType name of the statement being processed, such as INSERT; useful for errors.
+   * @return a copy of `tableSchema` with field metadata updated with the constant-folded values.
+   */
+  def constantFoldCurrentDefaultsToExistDefaults(
+      tableSchema: StructType, statementType: String): StructType = {

Review comment:
       ```suggestion
         tableSchema: StructType,
         statementType: String): StructType = {
   ```

##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
##########
@@ -1025,16 +1024,30 @@ abstract class CatalogTestUtils {
 
   def newTable(name: String, db: String): CatalogTable = newTable(name, Some(db))
 
-  def newTable(name: String, database: Option[String] = None): CatalogTable = {
+  def newTable(name: String,
+               database: Option[String] = None,
+               defaultColumns: Boolean = false): CatalogTable = {

Review comment:
       ```suggestion
     def newTable(
         name: String,
         database: Option[String] = None,
         defaultColumns: Boolean = false): CatalogTable = {
   ```

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This object contains fields to help process DEFAULT columns.
+ */
+object ResolveDefaultColumns {
+  // This column metadata indicates the default value associated with a particular table column that
+  // is in effect at any given time. Its value begins at the time of the initial CREATE/REPLACE
+  // TABLE statement with DEFAULT column definition(s), if any. It then changes whenever an ALTER
+  // TABLE statement SETs the DEFAULT. The intent is for this "current default" to be used by
+  // UPDATE, INSERT and MERGE, which evaluate each default expression for each row.
+  val CURRENT_DEFAULT_COLUMN_METADATA_KEY = "CURRENT_DEFAULT"
+  // This column metadata represents the default value for all existing rows in a table after a
+  // column has been added. This value is determined at time of CREATE TABLE, REPLACE TABLE, or
+  // ALTER TABLE ADD COLUMN, and never changes thereafter. The intent is for this "exist default"
+  // to be used by any scan when the columns in the source row are missing data. For example,
+  // consider the following sequence:
+  // CREATE TABLE t (c1 INT)
+  // INSERT INTO t VALUES (42)
+  // ALTER TABLE t ADD COLUMNS (c2 INT DEFAULT 43)
+  // SELECT c1, c2 FROM t
+  // In this case, the final query is expected to return 42, 43. The ALTER TABLE ADD COLUMNS command
+  // executed after there was already data in the table, so in order to enforce this invariant,
+  // we need either (1) an expensive backfill of value 43 at column c2 into all previous rows, or
+  // (2) indicate to each data source that selected columns missing data are to generate the
+  // corresponding DEFAULT value instead. We choose option (2) for efficiency, and represent this
+  // value as the text representation of a folded constant in the "EXISTS_DEFAULT" column metadata.
+  val EXISTS_DEFAULT_COLUMN_METADATA_KEY = "EXISTS_DEFAULT"
+  // Name of attributes representing explicit references to the value stored in the above
+  // CURRENT_DEFAULT_COLUMN_METADATA.
+  val CURRENT_DEFAULT_COLUMN_NAME = "DEFAULT"
+
+  /**
+   * Finds "current default" expressions in CREATE/REPLACE TABLE columns and constant-folds them.
+   *
+   * The results are stored in the "exists default" metadata of the same columns. For example, in
+   * the event of this statement:
+   *
+   * CREATE TABLE T(a INT, b INT DEFAULT 5 + 5)
+   *
+   * This method constant-folds the "current default" value, stored in the CURRENT_DEFAULT metadata
+   * of the "b" column, to "10", storing the result in the "exists default" value within the
+   * EXISTS_DEFAULT metadata of that same column. Meanwhile the "current default" metadata of this
+   * "b" column retains its original value of "5 + 5".
+   *
+   * @param tableSchema represents the names and types of the columns of the statement to process.
+   * @param statementType name of the statement being processed, such as INSERT; useful for errors.
+   * @return a copy of `tableSchema` with field metadata updated with the constant-folded values.
+   */
+  def constantFoldCurrentDefaultsToExistDefaults(
+      tableSchema: StructType, statementType: String): StructType = {
+    if (!SQLConf.get.enableDefaultColumns) {
+      return tableSchema
+    }
+    val newFields: Seq[StructField] = tableSchema.fields.map { field =>
+      if (field.metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) {
+        val analyzed: Expression = analyze(field, statementType, foldConstants = true)
+        val newMetadata: Metadata = new MetadataBuilder().withMetadata(field.metadata)
+          .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, analyzed.sql).build()
+        field.copy(metadata = newMetadata)
+      } else {
+        field
+      }
+    }
+    StructType(newFields)
+  }
+
+  /**
+   * Parses and analyzes the DEFAULT column text in `field`, returning an error upon failure.
+   *
+   * @param field represents the DEFAULT column value whose "default" metadata to parse and analyze.
+   * @param statementType which type of statement we are running, such as INSERT; useful for errors.
+   * @param foldConstants if true, perform constant-folding on the analyzed value before returning.
+   * @return Result of the analysis and constant-folding operation.
+   */
+  private def analyze(
+      field: StructField, statementType: String, foldConstants: Boolean = false): Expression = {

Review comment:
       ```suggestion
         field: StructField,
         statementType: String,
         foldConstants: Boolean = false): Expression = {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org