You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/07/07 01:39:54 UTC

spark git commit: [SPARK-8072] [SQL] Better AnalysisException for writing DataFrame with identically named columns

Repository: spark
Updated Branches:
  refs/heads/master 7b467cc93 -> 09a06418d


[SPARK-8072] [SQL] Better AnalysisException for writing DataFrame with identically named columns

Adding a function checkConstraints which will check for the constraints to be applied on the dataframe / dataframe schema. Function called before storing the dataframe to an external storage. Function added in the corresponding datasource API.
cc rxin marmbrus

Author: animesh <an...@apache.spark>

This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <mi...@databricks.com>

Closes #7013 from animeshbaranawal/8072 and squashes the following commits:

f70dd0e [animesh] Change IO exception to Analysis Exception
fd45e1b [animesh] 8072: Fix Style Issues
a8a964f [animesh] 8072: Improving on previous commits
3cc4d2c [animesh] Fix Style Issues
1a89115 [animesh] Fix Style Issues
98b4399 [animesh] 8072 : Moved the exception handling to ResolvedDataSource specific to parquet format
7c3d928 [animesh] 8072: Adding check to DataFrameWriter.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/09a06418
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/09a06418
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/09a06418

Branch: refs/heads/master
Commit: 09a06418debc25da0191d98798f7c5016d39be91
Parents: 7b467cc
Author: animesh <an...@apache.spark>
Authored: Mon Jul 6 16:39:49 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Jul 6 16:39:49 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/json/JSONRelation.scala    | 31 ++++++++++++++++++++
 .../apache/spark/sql/parquet/newParquet.scala   | 19 +++++++++++-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 24 +++++++++++++++
 3 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/09a06418/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 69bf13e..2361d3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -22,6 +22,7 @@ import java.io.IOException
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
@@ -37,6 +38,17 @@ private[sql] class DefaultSource
     parameters.getOrElse("path", sys.error("'path' must be specified for json data."))
   }
 
+  /** Constraints to be imposed on dataframe to be stored. */
+  private def checkConstraints(data: DataFrame): Unit = {
+    if (data.schema.fieldNames.length != data.schema.fieldNames.distinct.length) {
+      val duplicateColumns = data.schema.fieldNames.groupBy(identity).collect {
+        case (x, ys) if ys.length > 1 => "\"" + x + "\""
+      }.mkString(", ")
+      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
+        s"cannot save to JSON format")
+    }
+  }
+
   /** Returns a new base relation with the parameters. */
   override def createRelation(
       sqlContext: SQLContext,
@@ -63,6 +75,10 @@ private[sql] class DefaultSource
       mode: SaveMode,
       parameters: Map[String, String],
       data: DataFrame): BaseRelation = {
+    // check if dataframe satisfies the constraints
+    // before moving forward
+    checkConstraints(data)
+
     val path = checkPath(parameters)
     val filesystemPath = new Path(path)
     val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
@@ -130,6 +146,17 @@ private[sql] class JSONRelation(
       samplingRatio,
       userSpecifiedSchema)(sqlContext)
 
+  /** Constraints to be imposed on dataframe to be stored. */
+  private def checkConstraints(data: DataFrame): Unit = {
+    if (data.schema.fieldNames.length != data.schema.fieldNames.distinct.length) {
+      val duplicateColumns = data.schema.fieldNames.groupBy(identity).collect {
+        case (x, ys) if ys.length > 1 => "\"" + x + "\""
+      }.mkString(", ")
+      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
+        s"cannot save to JSON format")
+    }
+  }
+
   private val useJacksonStreamingAPI: Boolean = sqlContext.conf.useJacksonStreamingAPI
 
   override val needConversion: Boolean = false
@@ -178,6 +205,10 @@ private[sql] class JSONRelation(
   }
 
   override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+    // check if dataframe satisfies constraints
+    // before moving forward
+    checkConstraints(data)
+
     val filesystemPath = path match {
       case Some(p) => new Path(p)
       case None =>

http://git-wip-us.apache.org/repos/asf/spark/blob/09a06418/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 5ac3e9a..6bc69c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -164,7 +164,24 @@ private[sql] class ParquetRelation2(
     }
   }
 
-  override def dataSchema: StructType = maybeDataSchema.getOrElse(metadataCache.dataSchema)
+  /** Constraints on schema of dataframe to be stored. */
+  private def checkConstraints(schema: StructType): Unit = {
+    if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
+      val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
+        case (x, ys) if ys.length > 1 => "\"" + x + "\""
+      }.mkString(", ")
+      throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
+        s"cannot save to parquet format")
+    }
+  }
+
+  override def dataSchema: StructType = {
+    val schema = maybeDataSchema.getOrElse(metadataCache.dataSchema)
+    // check if schema satisfies the constraints
+    // before moving forward
+    checkConstraints(schema)
+    schema
+  }
 
   override private[sql] def refresh(): Unit = {
     super.refresh()

http://git-wip-us.apache.org/repos/asf/spark/blob/09a06418/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index afb1cf5..f592a99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -737,4 +737,28 @@ class DataFrameSuite extends QueryTest {
     df.col("")
     df.col("t.``")
   }
+
+  test("SPARK-8072: Better Exception for Duplicate Columns") {
+    // only one duplicate column present
+    val e = intercept[org.apache.spark.sql.AnalysisException] {
+      val df1 = Seq((1, 2, 3), (2, 3, 4), (3, 4, 5)).toDF("column1", "column2", "column1")
+                .write.format("parquet").save("temp")
+    }
+    assert(e.getMessage.contains("Duplicate column(s)"))
+    assert(e.getMessage.contains("parquet"))
+    assert(e.getMessage.contains("column1"))
+    assert(!e.getMessage.contains("column2"))
+
+    // multiple duplicate columns present
+    val f = intercept[org.apache.spark.sql.AnalysisException] {
+      val df2 = Seq((1, 2, 3, 4, 5), (2, 3, 4, 5, 6), (3, 4, 5, 6, 7))
+                .toDF("column1", "column2", "column3", "column1", "column3")
+                .write.format("json").save("temp")
+    }
+    assert(f.getMessage.contains("Duplicate column(s)"))
+    assert(f.getMessage.contains("JSON"))
+    assert(f.getMessage.contains("column1"))
+    assert(f.getMessage.contains("column3"))
+    assert(!f.getMessage.contains("column2"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org