You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/08/07 07:55:48 UTC

[GitHub] [spark] cloud-fan commented on a change in pull request #25305: [SPARK-28572][SQL] Simple analyzer checks for v2 table creation code paths

cloud-fan commented on a change in pull request #25305: [SPARK-28572][SQL] Simple analyzer checks for v2 table creation code paths
URL: https://github.com/apache/spark/pull/25305#discussion_r311414840
 
 

 ##########
 File path: sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
 ##########
 @@ -88,4 +91,170 @@ private[spark] object SchemaUtils {
         s"Found duplicate column(s) $colType: ${duplicateColumns.mkString(", ")}")
     }
   }
+
+  /**
+   * Returns all column names in this schema as a flat list. For example, a schema like:
+   *   | - a
+   *   | | - 1
+   *   | | - 2
+   *   | - b
+   *   | - c
+   *   | | - nest
+   *   |   | - 3
+   *   will get flattened to: "a", "a.1", "a.2", "b", "c", "c.nest", "c.nest.3"
+   */
+  def explodeNestedFieldNames(schema: StructType): Seq[String] = {
+    def explode(schema: StructType): Seq[Seq[String]] = {
+      def recurseIntoComplexTypes(complexType: DataType): Seq[Seq[String]] = {
+        complexType match {
+          case s: StructType => explode(s)
+          case a: ArrayType => recurseIntoComplexTypes(a.elementType)
+          case m: MapType =>
+            recurseIntoComplexTypes(m.keyType).map(Seq("key") ++ _) ++
+              recurseIntoComplexTypes(m.valueType).map(Seq("value") ++ _)
+          case _ => Nil
+        }
+      }
+
+      schema.flatMap {
+        case StructField(name, s: StructType, _, _) =>
+          Seq(Seq(name)) ++ explode(s).map(nested => Seq(name) ++ nested)
+        case StructField(name, a: ArrayType, _, _) =>
+          Seq(Seq(name)) ++ recurseIntoComplexTypes(a).map(nested => Seq(name) ++ nested)
+        case StructField(name, m: MapType, _, _) =>
+          Seq(Seq(name)) ++ recurseIntoComplexTypes(m).map(nested => Seq(name) ++ nested)
+        case f => Seq(f.name) :: Nil
+      }
+    }
+
+    explode(schema).map(UnresolvedAttribute.apply(_).name)
+  }
+
+  /**
+   * Checks if input column names have duplicate identifiers even in if they are nested. This
+   * throws an exception if the duplication exists.
+   *
+   * @param schema the schema to check for duplicates
+   * @param checkType contextual information around the check, used in an exception message
+   * @param isCaseSensitive Whether to be case sensitive when comparing column names
+   */
+  def checkV2ColumnNameDuplication(
+      schema: StructType,
+      checkType: String,
+      isCaseSensitive: Boolean): Unit = {
+    val columnNames = explodeNestedFieldNames(schema)
+    checkColumnNameDuplication(columnNames, checkType, isCaseSensitive)
+  }
+
+  /**
+   * Checks if the partitioning transforms are being duplicated or not. Throws an exception if
+   * duplication exists.
+   *
+   * @param transforms the schema to check for duplicates
+   * @param checkType contextual information around the check, used in an exception message
+   * @param isCaseSensitive Whether to be case sensitive when comparing column names
+   */
+  def checkTransformDuplication(
+      transforms: Seq[Transform],
+      checkType: String,
+      isCaseSensitive: Boolean): Unit = {
+    val extractedTransforms = transforms.map {
+      case b: BucketTransform =>
+        val colNames = b.columns.map(c => UnresolvedAttribute(c.fieldNames()).name)
+        // We need to check that we're not duplicating columns within our bucketing transform
+        checkColumnNameDuplication(colNames, "in the bucket definition", isCaseSensitive)
+        b.name -> colNames
+      case NamedTransform(transformName, refs) =>
+        val fieldNameParts =
+          refs.collect { case FieldReference(parts) => UnresolvedAttribute(parts).name }
+        // We could also check that we're not duplicating column names here as well if
+        // fieldNameParts.length > 1, but we're specifically not, because certain transforms can
+        // be defined where this is a legitimate use case.
+        transformName -> fieldNameParts
+    }
+    val normalizedTransforms = if (isCaseSensitive) {
+      extractedTransforms
+    } else {
+      extractedTransforms.map(t => t._1 -> t._2.map(_.toLowerCase(Locale.getDefault)))
 
 Review comment:
   nit: in other places of the code base we always use `toLowerCase(Locale.ROOT)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org