You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/12/16 01:56:03 UTC

spark git commit: [SPARK-12271][SQL] Improve error message when Dataset.as[ ] has incompatible schemas.

Repository: spark
Updated Branches:
  refs/heads/master b24c12d73 -> 86ea64dd1


[SPARK-12271][SQL] Improve error message when Dataset.as[ ] has incompatible schemas.

Author: Nong Li <no...@databricks.com>

Closes #10260 from nongli/spark-11271.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86ea64dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86ea64dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86ea64dd

Branch: refs/heads/master
Commit: 86ea64dd146757c8f997d05fb5bb44f6aa58515c
Parents: b24c12d
Author: Nong Li <no...@databricks.com>
Authored: Tue Dec 15 16:55:58 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Dec 15 16:55:58 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalyst/ScalaReflection.scala |  2 +-
 .../spark/sql/catalyst/encoders/ExpressionEncoder.scala |  1 +
 .../apache/spark/sql/catalyst/expressions/objects.scala | 12 +++++++-----
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala  | 10 +++++++++-
 4 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/86ea64dd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 9013fd0..ecff860 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -184,7 +184,7 @@ object ScalaReflection extends ScalaReflection {
         val TypeRef(_, _, Seq(optType)) = t
         val className = getClassNameFromType(optType)
         val newTypePath = s"""- option value class: "$className"""" +: walkedTypePath
-        WrapOption(constructorFor(optType, path, newTypePath))
+        WrapOption(constructorFor(optType, path, newTypePath), dataTypeFor(optType))
 
       case t if t <:< localTypeOf[java.lang.Integer] =>
         val boxedType = classOf[java.lang.Integer]

http://git-wip-us.apache.org/repos/asf/spark/blob/86ea64dd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 3e8420e..363178b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -251,6 +251,7 @@ case class ExpressionEncoder[T](
 
     val plan = Project(Alias(unbound, "")() :: Nil, LocalRelation(schema))
     val analyzedPlan = SimpleAnalyzer.execute(plan)
+    SimpleAnalyzer.checkAnalysis(analyzedPlan)
     val optimizedPlan = SimplifyCasts(analyzedPlan)
 
     // In order to construct instances of inner classes (for example those declared in a REPL cell),

http://git-wip-us.apache.org/repos/asf/spark/blob/86ea64dd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
index 96bc4fe..10ec75e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
@@ -23,11 +23,9 @@ import scala.reflect.ClassTag
 import org.apache.spark.SparkConf
 import org.apache.spark.serializer._
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
-import org.apache.spark.sql.catalyst.plans.logical.{Project, LocalRelation}
-import org.apache.spark.sql.catalyst.util.GenericArrayData
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
+import org.apache.spark.sql.catalyst.util.GenericArrayData
 import org.apache.spark.sql.types._
 
 /**
@@ -295,13 +293,17 @@ case class UnwrapOption(
  * Converts the result of evaluating `child` into an option, checking both the isNull bit and
  * (in the case of reference types) equality with null.
  * @param child The expression to evaluate and wrap.
+ * @param optType The type of this option.
  */
-case class WrapOption(child: Expression) extends UnaryExpression {
+case class WrapOption(child: Expression, optType: DataType)
+  extends UnaryExpression with ExpectsInputTypes {
 
   override def dataType: DataType = ObjectType(classOf[Option[_]])
 
   override def nullable: Boolean = true
 
+  override def inputTypes: Seq[AbstractDataType] = optType :: Nil
+
   override def eval(input: InternalRow): Any =
     throw new UnsupportedOperationException("Only code-generated evaluation is supported")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/86ea64dd/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 542e4d6..8f8db31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -481,10 +481,18 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
     val ds = Seq(2 -> 2.toByte, 3 -> 3.toByte).toDF("a", "b").as[ClassData]
     assert(ds.collect().toSeq == Seq(ClassData("2", 2), ClassData("3", 3)))
   }
-}
 
+  test("verify mismatching field names fail with a good error") {
+    val ds = Seq(ClassData("a", 1)).toDS()
+    val e = intercept[AnalysisException] {
+      ds.as[ClassData2].collect()
+    }
+    assert(e.getMessage.contains("cannot resolve 'c' given input columns a, b"), e.getMessage)
+  }
+}
 
 case class ClassData(a: String, b: Int)
+case class ClassData2(c: String, d: Int)
 case class ClassNullableData(a: String, b: Integer)
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org