You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/07/07 01:39:54 UTC
spark git commit: [SPARK-8072] [SQL] Better AnalysisException for
writing DataFrame with identically named columns
Repository: spark
Updated Branches:
refs/heads/master 7b467cc93 -> 09a06418d
[SPARK-8072] [SQL] Better AnalysisException for writing DataFrame with identically named columns
Adding a function checkConstraints which will check for the constraints to be applied on the dataframe / dataframe schema. Function called before storing the dataframe to an external storage. Function added in the corresponding datasource API.
cc rxin marmbrus
Author: animesh <an...@apache.spark>
This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <mi...@databricks.com>
Closes #7013 from animeshbaranawal/8072 and squashes the following commits:
f70dd0e [animesh] Change IO exception to Analysis Exception
fd45e1b [animesh] 8072: Fix Style Issues
a8a964f [animesh] 8072: Improving on previous commits
3cc4d2c [animesh] Fix Style Issues
1a89115 [animesh] Fix Style Issues
98b4399 [animesh] 8072 : Moved the exception handling to ResolvedDataSource specific to parquet format
7c3d928 [animesh] 8072: Adding check to DataFrameWriter.scala
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/09a06418
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/09a06418
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/09a06418
Branch: refs/heads/master
Commit: 09a06418debc25da0191d98798f7c5016d39be91
Parents: 7b467cc
Author: animesh <an...@apache.spark>
Authored: Mon Jul 6 16:39:49 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Jul 6 16:39:49 2015 -0700
----------------------------------------------------------------------
.../apache/spark/sql/json/JSONRelation.scala | 31 ++++++++++++++++++++
.../apache/spark/sql/parquet/newParquet.scala | 19 +++++++++++-
.../org/apache/spark/sql/DataFrameSuite.scala | 24 +++++++++++++++
3 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/09a06418/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 69bf13e..2361d3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -22,6 +22,7 @@ import java.io.IOException
import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
@@ -37,6 +38,17 @@ private[sql] class DefaultSource
parameters.getOrElse("path", sys.error("'path' must be specified for json data."))
}
+ /** Constraints to be imposed on dataframe to be stored. */
+ private def checkConstraints(data: DataFrame): Unit = {
+ if (data.schema.fieldNames.length != data.schema.fieldNames.distinct.length) {
+ val duplicateColumns = data.schema.fieldNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => "\"" + x + "\""
+ }.mkString(", ")
+ throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
+ s"cannot save to JSON format")
+ }
+ }
+
/** Returns a new base relation with the parameters. */
override def createRelation(
sqlContext: SQLContext,
@@ -63,6 +75,10 @@ private[sql] class DefaultSource
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
+ // check if dataframe satisfies the constraints
+ // before moving forward
+ checkConstraints(data)
+
val path = checkPath(parameters)
val filesystemPath = new Path(path)
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
@@ -130,6 +146,17 @@ private[sql] class JSONRelation(
samplingRatio,
userSpecifiedSchema)(sqlContext)
+ /** Constraints to be imposed on dataframe to be stored. */
+ private def checkConstraints(data: DataFrame): Unit = {
+ if (data.schema.fieldNames.length != data.schema.fieldNames.distinct.length) {
+ val duplicateColumns = data.schema.fieldNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => "\"" + x + "\""
+ }.mkString(", ")
+ throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
+ s"cannot save to JSON format")
+ }
+ }
+
private val useJacksonStreamingAPI: Boolean = sqlContext.conf.useJacksonStreamingAPI
override val needConversion: Boolean = false
@@ -178,6 +205,10 @@ private[sql] class JSONRelation(
}
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+ // check if dataframe satisfies constraints
+ // before moving forward
+ checkConstraints(data)
+
val filesystemPath = path match {
case Some(p) => new Path(p)
case None =>
http://git-wip-us.apache.org/repos/asf/spark/blob/09a06418/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 5ac3e9a..6bc69c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -164,7 +164,24 @@ private[sql] class ParquetRelation2(
}
}
- override def dataSchema: StructType = maybeDataSchema.getOrElse(metadataCache.dataSchema)
+ /** Constraints on schema of dataframe to be stored. */
+ private def checkConstraints(schema: StructType): Unit = {
+ if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
+ val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
+ case (x, ys) if ys.length > 1 => "\"" + x + "\""
+ }.mkString(", ")
+ throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
+ s"cannot save to parquet format")
+ }
+ }
+
+ override def dataSchema: StructType = {
+ val schema = maybeDataSchema.getOrElse(metadataCache.dataSchema)
+ // check if schema satisfies the constraints
+ // before moving forward
+ checkConstraints(schema)
+ schema
+ }
override private[sql] def refresh(): Unit = {
super.refresh()
http://git-wip-us.apache.org/repos/asf/spark/blob/09a06418/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index afb1cf5..f592a99 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -737,4 +737,28 @@ class DataFrameSuite extends QueryTest {
df.col("")
df.col("t.``")
}
+
+ test("SPARK-8072: Better Exception for Duplicate Columns") {
+ // only one duplicate column present
+ val e = intercept[org.apache.spark.sql.AnalysisException] {
+ val df1 = Seq((1, 2, 3), (2, 3, 4), (3, 4, 5)).toDF("column1", "column2", "column1")
+ .write.format("parquet").save("temp")
+ }
+ assert(e.getMessage.contains("Duplicate column(s)"))
+ assert(e.getMessage.contains("parquet"))
+ assert(e.getMessage.contains("column1"))
+ assert(!e.getMessage.contains("column2"))
+
+ // multiple duplicate columns present
+ val f = intercept[org.apache.spark.sql.AnalysisException] {
+ val df2 = Seq((1, 2, 3, 4, 5), (2, 3, 4, 5, 6), (3, 4, 5, 6, 7))
+ .toDF("column1", "column2", "column3", "column1", "column3")
+ .write.format("json").save("temp")
+ }
+ assert(f.getMessage.contains("Duplicate column(s)"))
+ assert(f.getMessage.contains("JSON"))
+ assert(f.getMessage.contains("column1"))
+ assert(f.getMessage.contains("column3"))
+ assert(!f.getMessage.contains("column2"))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org