You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/10 07:58:40 UTC

spark git commit: [SPARK-20460][SQL] Make it more consistent to handle column name duplication

Repository: spark
Updated Branches:
  refs/heads/master c444d1086 -> 647963a26


[SPARK-20460][SQL] Make it more consistent to handle column name duplication

## What changes were proposed in this pull request?
This pr made it more consistent to handle column name duplication. In the current master, error handling is different when hitting column name duplication:
```
// json
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("""{"a":1, "a":1}"""""").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("json").schema(schema).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#12, a#13.;
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)

scala> spark.read.format("json").load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Duplicate column(s) : "a" found, cannot save to JSON format;
  at org.apache.spark.sql.execution.datasources.json.JsonDataSource.checkConstraints(JsonDataSource.scala:81)
  at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:63)
  at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:57)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:176)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:176)

// csv
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("a,a", "1,1").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("csv").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#41, a#42.;
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:152)

// If `inferSchema` is true, a CSV format is duplicate-safe (See SPARK-16896)
scala> spark.read.format("csv").option("header", true).load("/tmp/data").show
+---+---+
| a0| a1|
+---+---+
|  1|  1|
+---+---+

// parquet
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq((1, 1)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet("/tmp/data")
scala> spark.read.format("parquet").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#110, a#111.;
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:152)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
```
When this patch applied, the results change to;
```

// json
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("""{"a":1, "a":1}"""""").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("json").schema(schema).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
  at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)

scala> spark.read.format("json").load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
  at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:156)

// csv
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq("a,a", "1,1").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data")
scala> spark.read.format("csv").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
  at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)

scala> spark.read.format("csv").option("header", true).load("/tmp/data").show
+---+---+
| a0| a1|
+---+---+
|  1|  1|
+---+---+

// parquet
scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil)
scala> Seq((1, 1)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet("/tmp/data")
scala> spark.read.format("parquet").schema(schema).option("header", false).load("/tmp/data").show
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a";
  at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47)
  at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33)
  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368)
```

## How was this patch tested?
Added tests in `DataFrameReaderWriterSuite` and `SQLQueryTestSuite`.

Author: Takeshi Yamamuro <ya...@apache.org>

Closes #17758 from maropu/SPARK-20460.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/647963a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/647963a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/647963a2

Branch: refs/heads/master
Commit: 647963a26a2d4468ebd9b68111ebe68bee501fde
Parents: c444d10
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Mon Jul 10 15:58:34 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Jul 10 15:58:34 2017 +0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   | 16 +---
 .../org/apache/spark/sql/util/SchemaUtils.scala | 58 ++++++++++---
 .../spark/sql/util/SchemaUtilsSuite.scala       | 83 ++++++++++++++++++
 .../command/createDataSourceTables.scala        |  2 -
 .../spark/sql/execution/command/tables.scala    |  8 +-
 .../spark/sql/execution/command/views.scala     |  9 +-
 .../sql/execution/datasources/DataSource.scala  | 43 ++++++++--
 .../InsertIntoHadoopFsRelationCommand.scala     | 14 ++--
 .../datasources/PartitioningUtils.scala         | 10 +--
 .../execution/datasources/jdbc/JdbcUtils.scala  | 11 +--
 .../datasources/json/JsonDataSource.scala       | 15 +---
 .../spark/sql/execution/datasources/rules.scala | 36 ++++----
 .../org/apache/spark/sql/DataFrameSuite.scala   |  4 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 56 +++++++++----
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  |  4 +-
 .../sql/sources/ResolvedDataSourceSuite.scala   |  5 +-
 .../sql/streaming/FileStreamSinkSuite.scala     | 37 ++++++++
 .../sql/test/DataFrameReaderWriterSuite.scala   | 88 ++++++++++++++++++++
 .../spark/sql/hive/execution/HiveDDLSuite.scala |  2 +-
 19 files changed, 382 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index c40d5f6..b44d2ee 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.StructType
 
 object SessionCatalog {
   val DEFAULT_DATABASE = "default"
@@ -188,19 +188,6 @@ class SessionCatalog(
     }
   }
 
-  private def checkDuplication(fields: Seq[StructField]): Unit = {
-    val columnNames = if (conf.caseSensitiveAnalysis) {
-      fields.map(_.name)
-    } else {
-      fields.map(_.name.toLowerCase)
-    }
-    if (columnNames.distinct.length != columnNames.length) {
-      val duplicateColumns = columnNames.groupBy(identity).collect {
-        case (x, ys) if ys.length > 1 => x
-      }
-      throw new AnalysisException(s"Found duplicate column(s): ${duplicateColumns.mkString(", ")}")
-    }
-  }
   // ----------------------------------------------------------------------------
   // Databases
   // ----------------------------------------------------------------------------
@@ -353,7 +340,6 @@ class SessionCatalog(
     val tableIdentifier = TableIdentifier(table, Some(db))
     requireDbExists(db)
     requireTableExists(tableIdentifier)
-    checkDuplication(newSchema)
 
     val catalogTable = externalCatalog.getTable(db, table)
     val oldSchema = catalogTable.schema

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
index e881685..41ca270 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.sql.util
 
-import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.types.StructType
 
 
 /**
@@ -25,29 +27,63 @@ import org.apache.spark.internal.Logging
  *
  * TODO: Merge this file with [[org.apache.spark.ml.util.SchemaUtils]].
  */
-private[spark] object SchemaUtils extends Logging {
+private[spark] object SchemaUtils {
 
   /**
-   * Checks if input column names have duplicate identifiers. Prints a warning message if
+   * Checks if an input schema has duplicate column names. This throws an exception if the
+   * duplication exists.
+   *
+   * @param schema schema to check
+   * @param colType column type name, used in an exception message
+   * @param caseSensitiveAnalysis whether duplication checks should be case sensitive or not
+   */
+  def checkSchemaColumnNameDuplication(
+      schema: StructType, colType: String, caseSensitiveAnalysis: Boolean = false): Unit = {
+    checkColumnNameDuplication(schema.map(_.name), colType, caseSensitiveAnalysis)
+  }
+
+  // Returns true if a given resolver is case-sensitive
+  private def isCaseSensitiveAnalysis(resolver: Resolver): Boolean = {
+    if (resolver == caseSensitiveResolution) {
+      true
+    } else if (resolver == caseInsensitiveResolution) {
+      false
+    } else {
+      sys.error("A resolver to check if two identifiers are equal must be " +
+        "`caseSensitiveResolution` or `caseInsensitiveResolution` in o.a.s.sql.catalyst.")
+    }
+  }
+
+  /**
+   * Checks if input column names have duplicate identifiers. This throws an exception if
    * the duplication exists.
    *
    * @param columnNames column names to check
-   * @param colType column type name, used in a warning message
+   * @param colType column type name, used in an exception message
+   * @param resolver resolver used to determine if two identifiers are equal
+   */
+  def checkColumnNameDuplication(
+      columnNames: Seq[String], colType: String, resolver: Resolver): Unit = {
+    checkColumnNameDuplication(columnNames, colType, isCaseSensitiveAnalysis(resolver))
+  }
+
+  /**
+   * Checks if input column names have duplicate identifiers. This throws an exception if
+   * the duplication exists.
+   *
+   * @param columnNames column names to check
+   * @param colType column type name, used in an exception message
    * @param caseSensitiveAnalysis whether duplication checks should be case sensitive or not
    */
   def checkColumnNameDuplication(
       columnNames: Seq[String], colType: String, caseSensitiveAnalysis: Boolean): Unit = {
-    val names = if (caseSensitiveAnalysis) {
-      columnNames
-    } else {
-      columnNames.map(_.toLowerCase)
-    }
+    val names = if (caseSensitiveAnalysis) columnNames else columnNames.map(_.toLowerCase)
     if (names.distinct.length != names.length) {
       val duplicateColumns = names.groupBy(identity).collect {
         case (x, ys) if ys.length > 1 => s"`$x`"
       }
-      logWarning(s"Found duplicate column(s) $colType: ${duplicateColumns.mkString(", ")}. " +
-        "You might need to assign different column names.")
+      throw new AnalysisException(
+        s"Found duplicate column(s) $colType: ${duplicateColumns.mkString(", ")}")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala
new file mode 100644
index 0000000..a25be2f
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.types.StructType
+
+class SchemaUtilsSuite extends SparkFunSuite {
+
+  private def resolver(caseSensitiveAnalysis: Boolean): Resolver = {
+    if (caseSensitiveAnalysis) {
+      caseSensitiveResolution
+    } else {
+      caseInsensitiveResolution
+    }
+  }
+
+  Seq((true, ("a", "a"), ("b", "b")), (false, ("a", "A"), ("b", "B"))).foreach {
+      case (caseSensitive, (a0, a1), (b0, b1)) =>
+
+    val testType = if (caseSensitive) "case-sensitive" else "case-insensitive"
+    test(s"Check column name duplication in $testType cases") {
+      def checkExceptionCases(schemaStr: String, duplicatedColumns: Seq[String]): Unit = {
+        val expectedErrorMsg = "Found duplicate column(s) in SchemaUtilsSuite: " +
+          duplicatedColumns.map(c => s"`${c.toLowerCase}`").mkString(", ")
+        val schema = StructType.fromDDL(schemaStr)
+        var msg = intercept[AnalysisException] {
+          SchemaUtils.checkSchemaColumnNameDuplication(
+            schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive)
+        }.getMessage
+        assert(msg.contains(expectedErrorMsg))
+        msg = intercept[AnalysisException] {
+          SchemaUtils.checkColumnNameDuplication(
+            schema.map(_.name), "in SchemaUtilsSuite", resolver(caseSensitive))
+        }.getMessage
+        assert(msg.contains(expectedErrorMsg))
+        msg = intercept[AnalysisException] {
+          SchemaUtils.checkColumnNameDuplication(
+            schema.map(_.name), "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive)
+        }.getMessage
+        assert(msg.contains(expectedErrorMsg))
+      }
+
+      checkExceptionCases(s"$a0 INT, b INT, $a1 INT", a0 :: Nil)
+      checkExceptionCases(s"$a0 INT, b INT, $a1 INT, $a0 INT", a0 :: Nil)
+      checkExceptionCases(s"$a0 INT, $b0 INT, $a1 INT, $a0 INT, $b1 INT", b0 :: a0 :: Nil)
+    }
+  }
+
+  test("Check no exception thrown for valid schemas") {
+    def checkNoExceptionCases(schemaStr: String, caseSensitive: Boolean): Unit = {
+      val schema = StructType.fromDDL(schemaStr)
+      SchemaUtils.checkSchemaColumnNameDuplication(
+        schema, "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive)
+      SchemaUtils.checkColumnNameDuplication(
+        schema.map(_.name), "in SchemaUtilsSuite", resolver(caseSensitive))
+      SchemaUtils.checkColumnNameDuplication(
+        schema.map(_.name), "in SchemaUtilsSuite", caseSensitiveAnalysis = caseSensitive)
+    }
+
+    checkNoExceptionCases("a INT, b INT, c INT", caseSensitive = true)
+    checkNoExceptionCases("Aa INT, b INT, aA INT", caseSensitive = true)
+
+    checkNoExceptionCases("a INT, b INT, c INT", caseSensitive = false)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 729bd39..04b2534 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.command
 
 import java.net.URI
 
-import org.apache.hadoop.fs.Path
-
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 8ded106..fa50d12 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -20,13 +20,11 @@ package org.apache.spark.sql.execution.command
 import java.io.File
 import java.net.URI
 import java.nio.file.FileSystems
-import java.util.Date
 
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 import scala.util.Try
 
-import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -42,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.util.Utils
 
 /**
@@ -202,6 +201,11 @@ case class AlterTableAddColumnsCommand(
 
     // make sure any partition columns are at the end of the fields
     val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
+
+    SchemaUtils.checkColumnNameDuplication(
+      reorderedSchema.map(_.name), "in the table definition of " + table.identifier,
+      conf.caseSensitiveAnalysis)
+
     catalog.alterTableSchema(
       table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index a6d56ca..ffdfd52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
 import org.apache.spark.sql.types.MetadataBuilder
+import org.apache.spark.sql.util.SchemaUtils
 
 
 /**
@@ -355,15 +356,15 @@ object ViewHelper {
       properties: Map[String, String],
       session: SparkSession,
       analyzedPlan: LogicalPlan): Map[String, String] = {
+    val queryOutput = analyzedPlan.schema.fieldNames
+
     // Generate the query column names, throw an AnalysisException if there exists duplicate column
     // names.
-    val queryOutput = analyzedPlan.schema.fieldNames
-    assert(queryOutput.distinct.size == queryOutput.size,
-      s"The view output ${queryOutput.mkString("(", ",", ")")} contains duplicate column name.")
+    SchemaUtils.checkColumnNameDuplication(
+      queryOutput, "in the view definition", session.sessionState.conf.resolver)
 
     // Generate the view default database name.
     val viewDefaultDatabase = session.sessionState.catalog.getCurrentDatabase
-
     removeQueryColumnNames(properties) ++
       generateViewDefaultDatabase(viewDefaultDatabase) ++
       generateQueryColumnNames(queryOutput)

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 75e5306..d36a04f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -87,6 +87,14 @@ case class DataSource(
   lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
   lazy val sourceInfo: SourceInfo = sourceSchema()
   private val caseInsensitiveOptions = CaseInsensitiveMap(options)
+  private val equality = sparkSession.sessionState.conf.resolver
+
+  bucketSpec.map { bucket =>
+    SchemaUtils.checkColumnNameDuplication(
+      bucket.bucketColumnNames, "in the bucket definition", equality)
+    SchemaUtils.checkColumnNameDuplication(
+      bucket.sortColumnNames, "in the sort definition", equality)
+  }
 
   /**
    * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer
@@ -132,7 +140,6 @@ case class DataSource(
       // Try to infer partitioning, because no DataSource in the read path provides the partitioning
       // columns properly unless it is a Hive DataSource
       val resolved = tempFileIndex.partitionSchema.map { partitionField =>
-        val equality = sparkSession.sessionState.conf.resolver
         // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
         userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
           partitionField)
@@ -146,7 +153,6 @@ case class DataSource(
         inferredPartitions
       } else {
         val partitionFields = partitionColumns.map { partitionColumn =>
-          val equality = sparkSession.sessionState.conf.resolver
           userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
             val inferredPartitions = tempFileIndex.partitionSchema
             val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
@@ -172,7 +178,6 @@ case class DataSource(
     }
 
     val dataSchema = userSpecifiedSchema.map { schema =>
-      val equality = sparkSession.sessionState.conf.resolver
       StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
     }.orElse {
       format.inferSchema(
@@ -184,9 +189,18 @@ case class DataSource(
         s"Unable to infer schema for $format. It must be specified manually.")
     }
 
-    SchemaUtils.checkColumnNameDuplication(
-      (dataSchema ++ partitionSchema).map(_.name), "in the data schema and the partition schema",
-      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+    // We just print a waring message if the data schema and partition schema have the duplicate
+    // columns. This is because we allow users to do so in the previous Spark releases and
+    // we have the existing tests for the cases (e.g., `ParquetHadoopFsRelationSuite`).
+    // See SPARK-18108 and SPARK-21144 for related discussions.
+    try {
+      SchemaUtils.checkColumnNameDuplication(
+        (dataSchema ++ partitionSchema).map(_.name),
+        "in the data schema and the partition schema",
+        equality)
+    } catch {
+      case e: AnalysisException => logWarning(e.getMessage)
+    }
 
     (dataSchema, partitionSchema)
   }
@@ -391,6 +405,23 @@ case class DataSource(
           s"$className is not a valid Spark SQL Data Source.")
     }
 
+    relation match {
+      case hs: HadoopFsRelation =>
+        SchemaUtils.checkColumnNameDuplication(
+          hs.dataSchema.map(_.name),
+          "in the data schema",
+          equality)
+        SchemaUtils.checkColumnNameDuplication(
+          hs.partitionSchema.map(_.name),
+          "in the partition schema",
+          equality)
+      case _ =>
+        SchemaUtils.checkColumnNameDuplication(
+          relation.schema.map(_.name),
+          "in the data schema",
+          equality)
+    }
+
     relation
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 0031567..c1bcfb8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -21,7 +21,6 @@ import java.io.IOException
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 
-import org.apache.spark.SparkContext
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition}
@@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * A command for writing data to a [[HadoopFsRelation]].  Supports both overwriting and appending.
@@ -64,13 +63,10 @@ case class InsertIntoHadoopFsRelationCommand(
     assert(children.length == 1)
 
     // Most formats don't do well with duplicate columns, so lets not allow that
-    if (query.schema.fieldNames.length != query.schema.fieldNames.distinct.length) {
-      val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect {
-        case (x, ys) if ys.length > 1 => "\"" + x + "\""
-      }.mkString(", ")
-      throw new AnalysisException(s"Duplicate column(s): $duplicateColumns found, " +
-        "cannot save to file.")
-    }
+    SchemaUtils.checkSchemaColumnNameDuplication(
+      query.schema,
+      s"when inserting into $outputPath",
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
 
     val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)
     val fs = outputPath.getFileSystem(hadoopConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index f61c673..92358da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SchemaUtils
 
 // TODO: We should tighten up visibility of the classes here once we clean up Hive coupling.
 
@@ -301,13 +302,8 @@ object PartitioningUtils {
       normalizedKey -> value
     }
 
-    if (normalizedPartSpec.map(_._1).distinct.length != normalizedPartSpec.length) {
-      val duplicateColumns = normalizedPartSpec.map(_._1).groupBy(identity).collect {
-        case (x, ys) if ys.length > 1 => x
-      }
-      throw new AnalysisException(s"Found duplicated columns in partition specification: " +
-        duplicateColumns.mkString(", "))
-    }
+    SchemaUtils.checkColumnNameDuplication(
+      normalizedPartSpec.map(_._1), "in the partition schema", resolver)
 
     normalizedPartSpec.toMap
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 55b2539..bbe9024f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.NextIterator
 
@@ -749,14 +750,8 @@ object JdbcUtils extends Logging {
     val nameEquality = df.sparkSession.sessionState.conf.resolver
 
     // checks duplicate columns in the user specified column types.
-    userSchema.fieldNames.foreach { col =>
-      val duplicatesCols = userSchema.fieldNames.filter(nameEquality(_, col))
-      if (duplicatesCols.size >= 2) {
-        throw new AnalysisException(
-          "Found duplicate column(s) in createTableColumnTypes option value: " +
-            duplicatesCols.mkString(", "))
-      }
-    }
+    SchemaUtils.checkColumnNameDuplication(
+      userSchema.map(_.name), "in the createTableColumnTypes option value", nameEquality)
 
     // checks if user specified column names exist in the DataFrame schema
     userSchema.fieldNames.foreach { col =>

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 5a92a71..8b7c270 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -59,9 +59,7 @@ abstract class JsonDataSource extends Serializable {
       inputPaths: Seq[FileStatus],
       parsedOptions: JSONOptions): Option[StructType] = {
     if (inputPaths.nonEmpty) {
-      val jsonSchema = infer(sparkSession, inputPaths, parsedOptions)
-      checkConstraints(jsonSchema)
-      Some(jsonSchema)
+      Some(infer(sparkSession, inputPaths, parsedOptions))
     } else {
       None
     }
@@ -71,17 +69,6 @@ abstract class JsonDataSource extends Serializable {
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: JSONOptions): StructType
-
-  /** Constraints to be imposed on schema to be stored. */
-  private def checkConstraints(schema: StructType): Unit = {
-    if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
-      val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
-        case (x, ys) if ys.length > 1 => "\"" + x + "\""
-      }.mkString(", ")
-      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
-        s"cannot save to JSON format")
-    }
-  }
 }
 
 object JsonDataSource {

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 3f4a785..41d40aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.InsertableRelation
 import org.apache.spark.sql.types.{AtomicType, StructType}
+import org.apache.spark.sql.util.SchemaUtils
 
 /**
  * Try to replaces [[UnresolvedRelation]]s if the plan is for direct query on files.
@@ -222,12 +223,10 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
   }
 
   private def normalizeCatalogTable(schema: StructType, table: CatalogTable): CatalogTable = {
-    val columnNames = if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
-      schema.map(_.name)
-    } else {
-      schema.map(_.name.toLowerCase)
-    }
-    checkDuplication(columnNames, "table definition of " + table.identifier)
+    SchemaUtils.checkSchemaColumnNameDuplication(
+      schema,
+      "in the table definition of " + table.identifier,
+      sparkSession.sessionState.conf.caseSensitiveAnalysis)
 
     val normalizedPartCols = normalizePartitionColumns(schema, table)
     val normalizedBucketSpec = normalizeBucketSpec(schema, table)
@@ -253,7 +252,10 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
       partCols = table.partitionColumnNames,
       resolver = sparkSession.sessionState.conf.resolver)
 
-    checkDuplication(normalizedPartitionCols, "partition")
+    SchemaUtils.checkColumnNameDuplication(
+      normalizedPartitionCols,
+      "in the partition schema",
+      sparkSession.sessionState.conf.resolver)
 
     if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
       if (DDLUtils.isHiveTable(table)) {
@@ -283,8 +285,15 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
           tableCols = schema.map(_.name),
           bucketSpec = bucketSpec,
           resolver = sparkSession.sessionState.conf.resolver)
-        checkDuplication(normalizedBucketSpec.bucketColumnNames, "bucket")
-        checkDuplication(normalizedBucketSpec.sortColumnNames, "sort")
+
+        SchemaUtils.checkColumnNameDuplication(
+          normalizedBucketSpec.bucketColumnNames,
+          "in the bucket definition",
+          sparkSession.sessionState.conf.resolver)
+        SchemaUtils.checkColumnNameDuplication(
+          normalizedBucketSpec.sortColumnNames,
+          "in the sort definition",
+          sparkSession.sessionState.conf.resolver)
 
         normalizedBucketSpec.sortColumnNames.map(schema(_)).map(_.dataType).foreach {
           case dt if RowOrdering.isOrderable(dt) => // OK
@@ -297,15 +306,6 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
     }
   }
 
-  private def checkDuplication(colNames: Seq[String], colType: String): Unit = {
-    if (colNames.distinct.length != colNames.length) {
-      val duplicateColumns = colNames.groupBy(identity).collect {
-        case (x, ys) if ys.length > 1 => x
-      }
-      failAnalysis(s"Found duplicate column(s) in $colType: ${duplicateColumns.mkString(", ")}")
-    }
-  }
-
   private def failAnalysis(msg: String) = throw new AnalysisException(msg)
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index b2219b4..a5a2e1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1189,7 +1189,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
       Seq((1, 2, 3), (2, 3, 4), (3, 4, 5)).toDF("column1", "column2", "column1")
         .write.format("parquet").save("temp")
     }
-    assert(e.getMessage.contains("Duplicate column(s)"))
+    assert(e.getMessage.contains("Found duplicate column(s) when inserting into"))
     assert(e.getMessage.contains("column1"))
     assert(!e.getMessage.contains("column2"))
 
@@ -1199,7 +1199,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
         .toDF("column1", "column2", "column3", "column1", "column3")
         .write.format("json").save("temp")
     }
-    assert(f.getMessage.contains("Duplicate column(s)"))
+    assert(f.getMessage.contains("Found duplicate column(s) when inserting into"))
     assert(f.getMessage.contains("column1"))
     assert(f.getMessage.contains("column3"))
     assert(!f.getMessage.contains("column2"))

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 5c40d8b..5c0a6aa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -436,16 +436,13 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   test("create table - duplicate column names in the table definition") {
-    val e = intercept[AnalysisException] {
-      sql("CREATE TABLE tbl(a int, a string) USING json")
-    }
-    assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
-
-    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
-      val e2 = intercept[AnalysisException] {
-        sql("CREATE TABLE tbl(a int, A string) USING json")
+    Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+        val errMsg = intercept[AnalysisException] {
+          sql(s"CREATE TABLE t($c0 INT, $c1 INT) USING parquet")
+        }.getMessage
+        assert(errMsg.contains("Found duplicate column(s) in the table definition of `t`"))
       }
-      assert(e2.message == "Found duplicate column(s) in table definition of `tbl`: a")
     }
   }
 
@@ -466,17 +463,33 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   test("create table - column repeated in partition columns") {
-    val e = intercept[AnalysisException] {
-      sql("CREATE TABLE tbl(a int) USING json PARTITIONED BY (a, a)")
+    Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+        val errMsg = intercept[AnalysisException] {
+          sql(s"CREATE TABLE t($c0 INT) USING parquet PARTITIONED BY ($c0, $c1)")
+        }.getMessage
+        assert(errMsg.contains("Found duplicate column(s) in the partition schema"))
+      }
     }
-    assert(e.message == "Found duplicate column(s) in partition: a")
   }
 
-  test("create table - column repeated in bucket columns") {
-    val e = intercept[AnalysisException] {
-      sql("CREATE TABLE tbl(a int) USING json CLUSTERED BY (a, a) INTO 4 BUCKETS")
+  test("create table - column repeated in bucket/sort columns") {
+    Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+        var errMsg = intercept[AnalysisException] {
+          sql(s"CREATE TABLE t($c0 INT) USING parquet CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS")
+        }.getMessage
+        assert(errMsg.contains("Found duplicate column(s) in the bucket definition"))
+
+        errMsg = intercept[AnalysisException] {
+          sql(s"""
+              |CREATE TABLE t($c0 INT, col INT) USING parquet CLUSTERED BY (col)
+              |  SORTED BY ($c0, $c1) INTO 2 BUCKETS
+             """.stripMargin)
+        }.getMessage
+        assert(errMsg.contains("Found duplicate column(s) in the sort definition"))
+      }
     }
-    assert(e.message == "Found duplicate column(s) in bucket: a")
   }
 
   test("Refresh table after changing the data source table partitioning") {
@@ -528,6 +541,17 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     }
   }
 
+  test("create view - duplicate column names in the view definition") {
+    Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+        val errMsg = intercept[AnalysisException] {
+          sql(s"CREATE VIEW t AS SELECT * FROM VALUES (1, 1) AS t($c0, $c1)")
+        }.getMessage
+        assert(errMsg.contains("Found duplicate column(s) in the view definition"))
+      }
+    }
+  }
+
   test("Alter/Describe Database") {
     val catalog = spark.sessionState.catalog
     val databaseNames = Seq("db1", "`database`")

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 92f50a0..2334d5a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.{Date, DriverManager, Timestamp}
+import java.sql.DriverManager
 import java.util.Properties
 
 import scala.collection.JavaConverters.propertiesAsScalaMapConverter
@@ -479,7 +479,7 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
           .jdbc(url1, "TEST.USERDBTYPETEST", properties)
       }.getMessage()
       assert(msg.contains(
-        "Found duplicate column(s) in createTableColumnTypes option value: name, NaMe"))
+        "Found duplicate column(s) in the createTableColumnTypes option value: `name`"))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 0f97fd7..308c507 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -21,11 +21,12 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.test.SharedSQLContext
 
-class ResolvedDataSourceSuite extends SparkFunSuite {
+class ResolvedDataSourceSuite extends SparkFunSuite with SharedSQLContext {
   private def getProvidingClass(name: String): Class[_] =
     DataSource(
-      sparkSession = null,
+      sparkSession = spark,
       className = name,
       options = Map(DateTimeUtils.TIMEZONE_OPTION -> DateTimeUtils.defaultTimeZone().getID)
     ).providingClass

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index bb6a278..6676099 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils
 
@@ -352,4 +353,40 @@ class FileStreamSinkSuite extends StreamTest {
     assertAncestorIsNotMetadataDirectory(s"/a/b/c")
     assertAncestorIsNotMetadataDirectory(s"/a/b/c/${FileStreamSink.metadataDir}extra")
   }
+
+  test("SPARK-20460 Check name duplication in schema") {
+    Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+        val inputData = MemoryStream[(Int, Int)]
+        val df = inputData.toDF()
+
+        val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+        val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+
+        var query: StreamingQuery = null
+        try {
+          query =
+            df.writeStream
+              .option("checkpointLocation", checkpointDir)
+              .format("json")
+              .start(outputDir)
+
+          inputData.addData((1, 1))
+
+          failAfter(streamingTimeout) {
+            query.processAllAvailable()
+          }
+        } finally {
+          if (query != null) {
+            query.stop()
+          }
+        }
+
+        val errorMsg = intercept[AnalysisException] {
+          spark.read.schema(s"$c0 INT, $c1 INT").json(outputDir).as[(Int, Int)]
+        }.getMessage
+        assert(errorMsg.contains("Found duplicate column(s) in the data schema: "))
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 306aecb..569bac1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -687,4 +688,91 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
     testRead(spark.read.schema(userSchemaString).text(dir, dir), data ++ data, userSchema)
     testRead(spark.read.schema(userSchemaString).text(Seq(dir, dir): _*), data ++ data, userSchema)
   }
+
+  test("SPARK-20460 Check name duplication in buckets") {
+    Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+        var errorMsg = intercept[AnalysisException] {
+          Seq((1, 1)).toDF("col", c0).write.bucketBy(2, c0, c1).saveAsTable("t")
+        }.getMessage
+        assert(errorMsg.contains("Found duplicate column(s) in the bucket definition"))
+
+        errorMsg = intercept[AnalysisException] {
+          Seq((1, 1)).toDF("col", c0).write.bucketBy(2, "col").sortBy(c0, c1).saveAsTable("t")
+        }.getMessage
+        assert(errorMsg.contains("Found duplicate column(s) in the sort definition"))
+      }
+    }
+  }
+
+  test("SPARK-20460 Check name duplication in schema") {
+    def checkWriteDataColumnDuplication(
+        format: String, colName0: String, colName1: String, tempDir: File): Unit = {
+      val errorMsg = intercept[AnalysisException] {
+        Seq((1, 1)).toDF(colName0, colName1).write.format(format).mode("overwrite")
+          .save(tempDir.getAbsolutePath)
+      }.getMessage
+      assert(errorMsg.contains("Found duplicate column(s) when inserting into"))
+    }
+
+    def checkReadUserSpecifiedDataColumnDuplication(
+        df: DataFrame, format: String, colName0: String, colName1: String, tempDir: File): Unit = {
+      val testDir = Utils.createTempDir(tempDir.getAbsolutePath)
+      df.write.format(format).mode("overwrite").save(testDir.getAbsolutePath)
+      val errorMsg = intercept[AnalysisException] {
+        spark.read.format(format).schema(s"$colName0 INT, $colName1 INT")
+          .load(testDir.getAbsolutePath)
+      }.getMessage
+      assert(errorMsg.contains("Found duplicate column(s) in the data schema:"))
+    }
+
+    def checkReadPartitionColumnDuplication(
+        format: String, colName0: String, colName1: String, tempDir: File): Unit = {
+      val testDir = Utils.createTempDir(tempDir.getAbsolutePath)
+      Seq(1).toDF("col").write.format(format).mode("overwrite")
+        .save(s"${testDir.getAbsolutePath}/$colName0=1/$colName1=1")
+      val errorMsg = intercept[AnalysisException] {
+        spark.read.format(format).load(testDir.getAbsolutePath)
+      }.getMessage
+      assert(errorMsg.contains("Found duplicate column(s) in the partition schema:"))
+    }
+
+    Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+        withTempDir { src =>
+          // Check CSV format
+          checkWriteDataColumnDuplication("csv", c0, c1, src)
+          checkReadUserSpecifiedDataColumnDuplication(
+            Seq((1, 1)).toDF("c0", "c1"), "csv", c0, c1, src)
+          // If `inferSchema` is true, a CSV format is duplicate-safe (See SPARK-16896)
+          var testDir = Utils.createTempDir(src.getAbsolutePath)
+          Seq("a,a", "1,1").toDF().coalesce(1).write.mode("overwrite").text(testDir.getAbsolutePath)
+          val df = spark.read.format("csv").option("inferSchema", true).option("header", true)
+            .load(testDir.getAbsolutePath)
+          checkAnswer(df, Row(1, 1))
+          checkReadPartitionColumnDuplication("csv", c0, c1, src)
+
+          // Check JSON format
+          checkWriteDataColumnDuplication("json", c0, c1, src)
+          checkReadUserSpecifiedDataColumnDuplication(
+            Seq((1, 1)).toDF("c0", "c1"), "json", c0, c1, src)
+          // Inferred schema cases
+          testDir = Utils.createTempDir(src.getAbsolutePath)
+          Seq(s"""{"$c0":3, "$c1":5}""").toDF().write.mode("overwrite")
+            .text(testDir.getAbsolutePath)
+          val errorMsg = intercept[AnalysisException] {
+            spark.read.format("json").option("inferSchema", true).load(testDir.getAbsolutePath)
+          }.getMessage
+          assert(errorMsg.contains("Found duplicate column(s) in the data schema:"))
+          checkReadPartitionColumnDuplication("json", c0, c1, src)
+
+          // Check Parquet format
+          checkWriteDataColumnDuplication("parquet", c0, c1, src)
+          checkReadUserSpecifiedDataColumnDuplication(
+            Seq((1, 1)).toDF("c0", "c1"), "parquet", c0, c1, src)
+          checkReadPartitionColumnDuplication("parquet", c0, c1, src)
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/647963a2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 31fa3d2..12daf3a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -345,7 +345,7 @@ class HiveDDLSuite
     val e = intercept[AnalysisException] {
       sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)")
     }
-    assert(e.message == "Found duplicate column(s) in table definition of `default`.`tbl`: a")
+    assert(e.message == "Found duplicate column(s) in the table definition of `default`.`tbl`: `a`")
   }
 
   test("add/drop partition with location - managed table") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org