You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/11/19 06:24:52 UTC

[GitHub] [spark] maropu commented on a change in pull request #30412: [SPARK-33480][SQL] Support char/varchar type

maropu commented on a change in pull request #30412:
URL: https://github.com/apache/spark/pull/30412#discussion_r526598186



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -94,6 +94,10 @@ trait CheckAnalysis extends PredicateHelper {
 
       case p if p.analyzed => // Skip already analyzed sub-plans
 
+      case leaf: LeafNode if leaf.output.map(_.dataType).exists(CharVarcharUtils.hasCharVarchar) =>

Review comment:
       > To simplify the implementation, this PR doesn't propagate char/varchar type info through functions/operators(e.g. substring).
   
   For checking it does not propagates char/varchar types, how about checking this in all logical nodes? It seems, in the current implementation, we can pass char/varchar types into logical plans, e.g., via `functions.from_json(col, schema having char/vchar types)`?
   

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.types._
+
+object CharVarcharUtils {
+
+  private val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING"
+
+  /**
+   * Replaces CharType/VarcharType with StringType recursively in the given struct type. If a
+   * top-level StructField's data type is CharType/VarcharType or has nested CharType/VarcharType,
+   * this method will add the original type string to the StructField's metadata, so that we can
+   * re-construct the original data type with CharType/VarcharType later when needed.
+   */
+  def replaceCharVarcharWithStringInSchema(st: StructType): StructType = {
+    StructType(st.map { field =>
+      if (hasCharVarchar(field.dataType)) {
+        val metadata = new MetadataBuilder().withMetadata(field.metadata)
+          .putString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY, field.dataType.sql).build()
+        field.copy(dataType = replaceCharVarcharWithString(field.dataType), metadata = metadata)
+      } else {
+        field
+      }
+    })
+  }
+
+  /**
+   * Returns true if the given data type is CharType/VarcharType or has nested CharType/VarcharType.
+   */
+  def hasCharVarchar(dt: DataType): Boolean = {
+    dt.existsRecursively(f => f.isInstanceOf[CharType] || f.isInstanceOf[VarcharType])
+  }
+
+  /**
+   * Replaces CharType/VarcharType with StringType recursively in the given data type.
+   */
+  def replaceCharVarcharWithString(dt: DataType): DataType = dt match {
+    case ArrayType(et, nullable) =>
+      ArrayType(replaceCharVarcharWithString(et), nullable)
+    case MapType(kt, vt, nullable) =>
+      MapType(replaceCharVarcharWithString(kt), replaceCharVarcharWithString(vt), nullable)
+    case StructType(fields) =>
+      StructType(fields.map { field =>
+        field.copy(dataType = replaceCharVarcharWithString(field.dataType))
+      })
+    case _: CharType => StringType
+    case _: VarcharType => StringType
+    case _ => dt
+  }
+
+  /**
+   * Removes the metadata entry that contains the original type string of CharType/VarcharType from
+   * the given attribute's metadata.
+   */
+  def cleanAttrMetadata(attr: AttributeReference): AttributeReference = {
+    val cleaned = new MetadataBuilder().withMetadata(attr.metadata)
+      .remove(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY).build()
+    attr.withMetadata(cleaned)
+  }
+
+  /**
+   * Re-construct the original data type from the type string in the given metadata.
+   * This is needed when dealing with char/varchar columns/fields.
+   */
+  def getRawType(metadata: Metadata): Option[DataType] = {
+    if (metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)) {
+      Some(CatalystSqlParser.parseRawDataType(
+        metadata.getString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Re-construct the original StructType from the type strings in the metadata of StructFields.
+   * This is needed when dealing with char/varchar columns/fields.
+   */
+  def getRawSchema(schema: StructType): StructType = {
+    StructType(schema.map { field =>
+      getRawType(field.metadata).map(rawType => field.copy(dataType = rawType)).getOrElse(field)
+    })
+  }
+
+  /**
+   * Returns expressions to apply read-side char type padding for the given attributes. String
+   * values should be right-padded to N characters if it's from a CHAR(N) column/field.
+   */
+  def charTypePadding(output: Seq[AttributeReference]): Seq[NamedExpression] = {
+    output.map { attr =>
+      getRawType(attr.metadata).filter { rawType =>
+        rawType.existsRecursively(_.isInstanceOf[CharType])
+      }.map { rawType =>
+        Alias(charTypePadding(attr, rawType), attr.name)(explicitMetadata = Some(attr.metadata))
+      }.getOrElse(attr)
+    }
+  }
+
+  private def charTypePadding(expr: Expression, dt: DataType): Expression = dt match {
+    case CharType(length) => StringRPad(expr, Literal(length))
+
+    case StructType(fields) =>
+      CreateNamedStruct(fields.zipWithIndex.flatMap { case (f, i) =>
+        Seq(Literal(f.name), charTypePadding(GetStructField(expr, i, Some(f.name)), f.dataType))
+      })
+
+    case ArrayType(et, containsNull) => charTypePaddingInArray(expr, et, containsNull)
+
+    case MapType(kt, vt, valueContainsNull) =>
+      val newKeys = charTypePaddingInArray(MapKeys(expr), kt, containsNull = false)
+      val newValues = charTypePaddingInArray(MapValues(expr), vt, valueContainsNull)
+      MapFromArrays(newKeys, newValues)
+
+    case _ => expr
+  }
+
+  private def charTypePaddingInArray(
+      arr: Expression, et: DataType, containsNull: Boolean): Expression = {
+    val param = NamedLambdaVariable("x", replaceCharVarcharWithString(et), containsNull)
+    val func = LambdaFunction(charTypePadding(param, et), Seq(param))
+    ArrayTransform(arr, func)
+  }
+
+  /**
+   * Returns an expression to apply write-side char type padding for the given expression. A string
+   * value can not exceed N characters if it's written into a CHAR(N)/VARCHAR(N) column/field.
+   */
+  def stringLengthCheck(expr: Expression, targetAttr: Attribute): Expression = {
+    getRawType(targetAttr.metadata).map { rawType =>
+      stringLengthCheck(expr, rawType)
+    }.getOrElse(expr)
+  }
+
+  private def stringLengthCheck(expr: Expression, dt: DataType): Expression = dt match {
+    case CharType(length) =>
+      val trimmed = StringTrimRight(expr)
+      val errorMsg = Concat(Seq(

Review comment:
       nit: How about extract `errorMsg` as a private method?
   ```
     private def raiseError(expr: Expression, typeName: String, length: Int): Expression = {
       val errorMsg = Concat(Seq(
         Literal("input string '"),
         expr,
         Literal(s"' exceeds $typeName type length limitation: $length")))
       Cast(RaiseError(errorMsg), StringType)
     }
   
     private def stringLengthCheck(expr: Expression, dt: DataType): Expression = dt match {
       case CharType(length) =>
         val trimmed = StringTrimRight(expr)
         // Trailing spaces do not count in the length check. We don't need to retain the trailing
         // spaces, as we will pad char type columns/fields at read time.
         If(
           GreaterThan(Length(trimmed), Literal(length)),
           raiseError(expr, "char", length),
           trimmed)
   ...
   ```

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3088,7 +3088,12 @@ class Analyzer(override val catalogManager: CatalogManager)
         val projection = TableOutputResolver.resolveOutputColumns(
           v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf)
         if (projection != v2Write.query) {
-          v2Write.withNewQuery(projection)
+          val cleanedTable = v2Write.table match {
+            case r: DataSourceV2Relation =>
+              r.copy(output = r.output.map(CharVarcharUtils.cleanAttrMetadata))

Review comment:
       Does the current implementation assume the analyzer removes the metadata in plans before the optimizer phase? If so, how about checking if plans don't have the metadata in `CheckAnalysis`?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
##########
@@ -93,19 +94,17 @@ object TableOutputResolver {
       tableAttr.metadata == queryExpr.metadata) {
       Some(queryExpr)
     } else {
-      // Renaming is needed for handling the following cases like
-      // 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
-      // 2) Target tables have column metadata
-      storeAssignmentPolicy match {
+      val casted = storeAssignmentPolicy match {
         case StoreAssignmentPolicy.ANSI =>
-          Some(Alias(
-            AnsiCast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone)),
-            tableAttr.name)(explicitMetadata = Option(tableAttr.metadata)))
+          AnsiCast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone))
         case _ =>
-          Some(Alias(
-            Cast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone)),
-            tableAttr.name)(explicitMetadata = Option(tableAttr.metadata)))
+          Cast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone))
       }
+      val strLenChecked = CharVarcharUtils.stringLengthCheck(casted, tableAttr)

Review comment:
       To avoid accidentally adding the length check exprs again, we cannot remove the metadata at the same time as adding the exprs?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
##########
@@ -473,7 +473,13 @@ class SessionCatalog(
     val table = formatTableName(name.table)
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Some(db)))
-    externalCatalog.getTable(db, table)
+    removeCharVarcharFromTableSchema(externalCatalog.getTable(db, table))
+  }
+
+  // We replace char/varchar with string type in the table schema, as Spark's type system doesn't
+  // support char/varchar yet.
+  private def removeCharVarcharFromTableSchema(t: CatalogTable): CatalogTable = {

Review comment:
       Could you inline this in L476?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/types/VarcharType.scala
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.unsafe.types.UTF8String
+
+@Experimental
+case class VarcharType(length: Int) extends AtomicType {

Review comment:
       Also, how about setting a reasonable max length for these types like postgresql does so?
   ```
   postgres=# create table t (v char(100000000));
   ERROR:  length for type char cannot exceed 10485760
   LINE 1: create table t (v char(100000000));
   ```

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/types/VarcharType.scala
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.unsafe.types.UTF8String
+
+@Experimental
+case class VarcharType(length: Int) extends AtomicType {

Review comment:
       How about checking if `length` has a valid number in the constructor of char/vchar? 
   ```
     if (length > 0) {
       throw new AnalysisException("XXX")
     }
   ```

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
##########
@@ -93,19 +94,17 @@ object TableOutputResolver {
       tableAttr.metadata == queryExpr.metadata) {
       Some(queryExpr)
     } else {
-      // Renaming is needed for handling the following cases like
-      // 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
-      // 2) Target tables have column metadata
-      storeAssignmentPolicy match {
+      val casted = storeAssignmentPolicy match {
         case StoreAssignmentPolicy.ANSI =>
-          Some(Alias(
-            AnsiCast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone)),
-            tableAttr.name)(explicitMetadata = Option(tableAttr.metadata)))
+          AnsiCast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone))
         case _ =>
-          Some(Alias(
-            Cast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone)),
-            tableAttr.name)(explicitMetadata = Option(tableAttr.metadata)))
+          Cast(queryExpr, tableAttr.dataType, Option(conf.sessionLocalTimeZone))
       }
+      val strLenChecked = CharVarcharUtils.stringLengthCheck(casted, tableAttr)

Review comment:
       nit: `strLenChecked` -> `exprWithStrLenCheck`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org