You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/06/03 09:48:01 UTC

spark git commit: [SPARK-7562][SPARK-6444][SQL] Improve error reporting for expression data type mismatch

Repository: spark
Updated Branches:
  refs/heads/master ce320cb2d -> d38cf217e


[SPARK-7562][SPARK-6444][SQL] Improve error reporting for expression data type mismatch

It seems hard to find a common pattern of checking types in `Expression`. Sometimes we know what input types we need(like `And`, we know we need two booleans), sometimes we just have some rules(like `Add`, we need 2 numeric types which are equal). So I defined a general interface `checkInputDataTypes` in `Expression` which returns a `TypeCheckResult`. `TypeCheckResult` can tell whether this expression passes the type checking or what the type mismatch is.

This PR mainly works on apply input types checking for arithmetic and predicate expressions.

TODO: apply type checking interface to more expressions.

Author: Wenchen Fan <cl...@outlook.com>

Closes #6405 from cloud-fan/6444 and squashes the following commits:

b5ff31b [Wenchen Fan] address comments
b917275 [Wenchen Fan] rebase
39929d9 [Wenchen Fan] add todo
0808fd2 [Wenchen Fan] make constrcutor of TypeCheckResult private
3bee157 [Wenchen Fan] and decimal type coercion rule for binary comparison
8883025 [Wenchen Fan] apply type check interface to CaseWhen
cffb67c [Wenchen Fan] to have resolved call the data type check function
6eaadff [Wenchen Fan] add equal type constraint to EqualTo
3affbd8 [Wenchen Fan] more fixes
654d46a [Wenchen Fan] improve tests
e0a3628 [Wenchen Fan] improve error message
1524ff6 [Wenchen Fan] fix style
69ca3fe [Wenchen Fan] add error message and tests
c71d02c [Wenchen Fan] fix hive tests
6491721 [Wenchen Fan] use value class TypeCheckResult
7ae76b9 [Wenchen Fan] address comments
cb77e4f [Wenchen Fan] Improve error reporting for expression data type mismatch


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d38cf217
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d38cf217
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d38cf217

Branch: refs/heads/master
Commit: d38cf217e0c6bfbf451c659675280b43a08bc70f
Parents: ce320cb
Author: Wenchen Fan <cl...@outlook.com>
Authored: Wed Jun 3 00:47:52 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jun 3 00:47:52 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkFunSuite.scala  |   4 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala   |  12 +-
 .../catalyst/analysis/HiveTypeCoercion.scala    | 132 ++++----
 .../sql/catalyst/analysis/TypeCheckResult.scala |  45 +++
 .../sql/catalyst/expressions/Expression.scala   |  28 +-
 .../sql/catalyst/expressions/arithmetic.scala   | 308 ++++++++-----------
 .../catalyst/expressions/mathfuncs/binary.scala |  17 +-
 .../sql/catalyst/expressions/predicates.scala   | 226 +++++++-------
 .../sql/catalyst/optimizer/Optimizer.scala      |   4 +
 .../spark/sql/catalyst/util/DateUtils.scala     |   2 +-
 .../spark/sql/catalyst/util/TypeUtils.scala     |  56 ++++
 .../org/apache/spark/sql/types/DataType.scala   |   2 +-
 .../analysis/DecimalPrecisionSuite.scala        |   6 +-
 .../analysis/HiveTypeCoercionSuite.scala        |  15 +-
 .../ExpressionTypeCheckingSuite.scala           | 143 +++++++++
 .../org/apache/spark/sql/json/InferSchema.scala |   2 +-
 .../org/apache/spark/sql/json/JsonRDD.scala     |   2 +-
 17 files changed, 583 insertions(+), 421 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 8cb3443..9be9db0 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -30,8 +30,8 @@ private[spark] abstract class SparkFunSuite extends FunSuite with Logging {
    * Log the suite name and the test name before and after each test.
    *
    * Subclasses should never override this method. If they wish to run
-   * custom code before and after each test, they should should mix in
-   * the {{org.scalatest.BeforeAndAfter}} trait instead.
+   * custom code before and after each test, they should mix in the
+   * {{org.scalatest.BeforeAndAfter}} trait instead.
    */
   final protected override def withFixture(test: NoArgTest): Outcome = {
     val testName = test.text

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 193dc6b..c0695ae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -62,15 +62,17 @@ trait CheckAnalysis {
             val from = operator.inputSet.map(_.name).mkString(", ")
             a.failAnalysis(s"cannot resolve '${a.prettyString}' given input columns $from")
 
+          case e: Expression if e.checkInputDataTypes().isFailure =>
+            e.checkInputDataTypes() match {
+              case TypeCheckResult.TypeCheckFailure(message) =>
+                e.failAnalysis(
+                  s"cannot resolve '${e.prettyString}' due to data type mismatch: $message")
+            }
+
           case c: Cast if !c.resolved =>
             failAnalysis(
               s"invalid cast from ${c.child.dataType.simpleString} to ${c.dataType.simpleString}")
 
-          case b: BinaryExpression if !b.resolved =>
-            failAnalysis(
-              s"invalid expression ${b.prettyString} " +
-              s"between ${b.left.dataType.simpleString} and ${b.right.dataType.simpleString}")
-
           case WindowExpression(UnresolvedWindowFunction(name, _), _) =>
             failAnalysis(
               s"Could not resolve window function '$name'. " +

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index edcc918..b064600 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -41,7 +41,7 @@ object HiveTypeCoercion {
    * with primitive types, because in that case the precision and scale of the result depends on
    * the operation. Those rules are implemented in [[HiveTypeCoercion.DecimalPrecision]].
    */
-  val findTightestCommonType: (DataType, DataType) => Option[DataType] = {
+  val findTightestCommonTypeOfTwo: (DataType, DataType) => Option[DataType] = {
     case (t1, t2) if t1 == t2 => Some(t1)
     case (NullType, t1) => Some(t1)
     case (t1, NullType) => Some(t1)
@@ -57,6 +57,17 @@ object HiveTypeCoercion {
 
     case _ => None
   }
+
+  /**
+   * Find the tightest common type of a set of types by continuously applying
+   * `findTightestCommonTypeOfTwo` on these types.
+   */
+  private def findTightestCommonType(types: Seq[DataType]) = {
+    types.foldLeft[Option[DataType]](Some(NullType))((r, c) => r match {
+      case None => None
+      case Some(d) => findTightestCommonTypeOfTwo(d, c)
+    })
+  }
 }
 
 /**
@@ -180,7 +191,7 @@ trait HiveTypeCoercion {
 
           case (l, r) if l.dataType != r.dataType =>
             logDebug(s"Resolving mismatched union input ${l.dataType}, ${r.dataType}")
-            findTightestCommonType(l.dataType, r.dataType).map { widestType =>
+            findTightestCommonTypeOfTwo(l.dataType, r.dataType).map { widestType =>
               val newLeft =
                 if (l.dataType == widestType) l else Alias(Cast(l, widestType), l.name)()
               val newRight =
@@ -217,7 +228,7 @@ trait HiveTypeCoercion {
         case e if !e.childrenResolved => e
 
         case b: BinaryExpression if b.left.dataType != b.right.dataType =>
-          findTightestCommonType(b.left.dataType, b.right.dataType).map { widestType =>
+          findTightestCommonTypeOfTwo(b.left.dataType, b.right.dataType).map { widestType =>
             val newLeft =
               if (b.left.dataType == widestType) b.left else Cast(b.left, widestType)
             val newRight =
@@ -441,21 +452,18 @@ trait HiveTypeCoercion {
             DecimalType(min(p1 - s1, p2 - s2) + max(s1, s2), max(s1, s2))
           )
 
-        case LessThan(e1 @ DecimalType.Expression(p1, s1),
-                      e2 @ DecimalType.Expression(p2, s2)) if p1 != p2 || s1 != s2 =>
-          LessThan(Cast(e1, DecimalType.Unlimited), Cast(e2, DecimalType.Unlimited))
-
-        case LessThanOrEqual(e1 @ DecimalType.Expression(p1, s1),
-                             e2 @ DecimalType.Expression(p2, s2)) if p1 != p2 || s1 != s2 =>
-          LessThanOrEqual(Cast(e1, DecimalType.Unlimited), Cast(e2, DecimalType.Unlimited))
-
-        case GreaterThan(e1 @ DecimalType.Expression(p1, s1),
-                         e2 @ DecimalType.Expression(p2, s2)) if p1 != p2 || s1 != s2 =>
-          GreaterThan(Cast(e1, DecimalType.Unlimited), Cast(e2, DecimalType.Unlimited))
-
-        case GreaterThanOrEqual(e1 @ DecimalType.Expression(p1, s1),
-                                e2 @ DecimalType.Expression(p2, s2)) if p1 != p2 || s1 != s2 =>
-          GreaterThanOrEqual(Cast(e1, DecimalType.Unlimited), Cast(e2, DecimalType.Unlimited))
+        // When we compare 2 decimal types with different precisions, cast them to the smallest
+        // common precision.
+        case b @ BinaryComparison(e1 @ DecimalType.Expression(p1, s1),
+                                  e2 @ DecimalType.Expression(p2, s2)) if p1 != p2 || s1 != s2 =>
+          val resultType = DecimalType(max(p1, p2), max(s1, s2))
+          b.makeCopy(Array(Cast(e1, resultType), Cast(e2, resultType)))
+        case b @ BinaryComparison(e1 @ DecimalType.Fixed(_, _), e2)
+          if e2.dataType == DecimalType.Unlimited =>
+          b.makeCopy(Array(Cast(e1, DecimalType.Unlimited), e2))
+        case b @ BinaryComparison(e1, e2 @ DecimalType.Fixed(_, _))
+          if e1.dataType == DecimalType.Unlimited =>
+          b.makeCopy(Array(e1, Cast(e2, DecimalType.Unlimited)))
 
         // Promote integers inside a binary expression with fixed-precision decimals to decimals,
         // and fixed-precision decimals in an expression with floats / doubles to doubles
@@ -570,7 +578,7 @@ trait HiveTypeCoercion {
 
       case a @ CreateArray(children) if !a.resolved =>
         val commonType = a.childTypes.reduce(
-          (a, b) => findTightestCommonType(a, b).getOrElse(StringType))
+          (a, b) => findTightestCommonTypeOfTwo(a, b).getOrElse(StringType))
         CreateArray(
           children.map(c => if (c.dataType == commonType) c else Cast(c, commonType)))
 
@@ -599,14 +607,9 @@ trait HiveTypeCoercion {
       // from the list. So we need to make sure the return type is deterministic and
       // compatible with every child column.
       case Coalesce(es) if es.map(_.dataType).distinct.size > 1 =>
-        val dt: Option[DataType] = Some(NullType)
         val types = es.map(_.dataType)
-        val rt = types.foldLeft(dt)((r, c) => r match {
-          case None => None
-          case Some(d) => findTightestCommonType(d, c)
-        })
-        rt match {
-          case Some(finaldt) => Coalesce(es.map(Cast(_, finaldt)))
+        findTightestCommonType(types) match {
+          case Some(finalDataType) => Coalesce(es.map(Cast(_, finalDataType)))
           case None =>
             sys.error(s"Could not determine return type of Coalesce for ${types.mkString(",")}")
         }
@@ -619,17 +622,13 @@ trait HiveTypeCoercion {
    */
   object Division extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
-      // Skip nodes who's children have not been resolved yet.
-      case e if !e.childrenResolved => e
+      // Skip nodes who has not been resolved yet,
+      // as this is an extra rule which should be applied at last.
+      case e if !e.resolved => e
 
       // Decimal and Double remain the same
-      case d: Divide if d.resolved && d.dataType == DoubleType => d
-      case d: Divide if d.resolved && d.dataType.isInstanceOf[DecimalType] => d
-
-      case Divide(l, r) if l.dataType.isInstanceOf[DecimalType] =>
-        Divide(l, Cast(r, DecimalType.Unlimited))
-      case Divide(l, r) if r.dataType.isInstanceOf[DecimalType] =>
-        Divide(Cast(l, DecimalType.Unlimited), r)
+      case d: Divide if d.dataType == DoubleType => d
+      case d: Divide if d.dataType.isInstanceOf[DecimalType] => d
 
       case Divide(l, r) => Divide(Cast(l, DoubleType), Cast(r, DoubleType))
     }
@@ -642,42 +641,33 @@ trait HiveTypeCoercion {
     import HiveTypeCoercion._
 
     def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
-      case cw: CaseWhenLike if cw.childrenResolved && !cw.valueTypesEqual =>
-        logDebug(s"Input values for null casting ${cw.valueTypes.mkString(",")}")
-        val commonType = cw.valueTypes.reduce { (v1, v2) =>
-          findTightestCommonType(v1, v2).getOrElse(sys.error(
-            s"Types in CASE WHEN must be the same or coercible to a common type: $v1 != $v2"))
-        }
-        val transformedBranches = cw.branches.sliding(2, 2).map {
-          case Seq(when, value) if value.dataType != commonType =>
-            Seq(when, Cast(value, commonType))
-          case Seq(elseVal) if elseVal.dataType != commonType =>
-            Seq(Cast(elseVal, commonType))
-          case s => s
-        }.reduce(_ ++ _)
-        cw match {
-          case _: CaseWhen =>
-            CaseWhen(transformedBranches)
-          case CaseKeyWhen(key, _) =>
-            CaseKeyWhen(key, transformedBranches)
-        }
-
-      case ckw: CaseKeyWhen if ckw.childrenResolved && !ckw.resolved =>
-        val commonType = (ckw.key +: ckw.whenList).map(_.dataType).reduce { (v1, v2) =>
-          findTightestCommonType(v1, v2).getOrElse(sys.error(
-            s"Types in CASE WHEN must be the same or coercible to a common type: $v1 != $v2"))
-        }
-        val transformedBranches = ckw.branches.sliding(2, 2).map {
-          case Seq(when, then) if when.dataType != commonType =>
-            Seq(Cast(when, commonType), then)
-          case s => s
-        }.reduce(_ ++ _)
-        val transformedKey = if (ckw.key.dataType != commonType) {
-          Cast(ckw.key, commonType)
-        } else {
-          ckw.key
-        }
-        CaseKeyWhen(transformedKey, transformedBranches)
+      case c: CaseWhenLike if c.childrenResolved && !c.valueTypesEqual =>
+        logDebug(s"Input values for null casting ${c.valueTypes.mkString(",")}")
+        val maybeCommonType = findTightestCommonType(c.valueTypes)
+        maybeCommonType.map { commonType =>
+          val castedBranches = c.branches.grouped(2).map {
+            case Seq(when, value) if value.dataType != commonType =>
+              Seq(when, Cast(value, commonType))
+            case Seq(elseVal) if elseVal.dataType != commonType =>
+              Seq(Cast(elseVal, commonType))
+            case other => other
+          }.reduce(_ ++ _)
+          c match {
+            case _: CaseWhen => CaseWhen(castedBranches)
+            case CaseKeyWhen(key, _) => CaseKeyWhen(key, castedBranches)
+          }
+        }.getOrElse(c)
+
+      case c: CaseKeyWhen if c.childrenResolved && !c.resolved =>
+        val maybeCommonType = findTightestCommonType((c.key +: c.whenList).map(_.dataType))
+        maybeCommonType.map { commonType =>
+          val castedBranches = c.branches.grouped(2).map {
+            case Seq(when, then) if when.dataType != commonType =>
+              Seq(Cast(when, commonType), then)
+            case other => other
+          }.reduce(_ ++ _)
+          CaseKeyWhen(Cast(c.key, commonType), castedBranches)
+        }.getOrElse(c)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCheckResult.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCheckResult.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCheckResult.scala
new file mode 100644
index 0000000..79c3528
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCheckResult.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+/**
+ * Represents the result of `Expression.checkInputDataTypes`.
+ * We will throw `AnalysisException` in `CheckAnalysis` if `isFailure` is true.
+ */
+trait TypeCheckResult {
+  def isFailure: Boolean = !isSuccess
+  def isSuccess: Boolean
+}
+
+object TypeCheckResult {
+
+  /**
+   * Represents the successful result of `Expression.checkInputDataTypes`.
+   */
+  object TypeCheckSuccess extends TypeCheckResult {
+    def isSuccess: Boolean = true
+  }
+
+  /**
+   * Represents the failing result of `Expression.checkInputDataTypes`,
+   * with a error message to show the reason of failure.
+   */
+  case class TypeCheckFailure(message: String) extends TypeCheckResult {
+    def isSuccess: Boolean = false
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index adc6505..3cf851a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.trees
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.types._
@@ -53,11 +53,12 @@ abstract class Expression extends TreeNode[Expression] {
 
   /**
    * Returns `true` if this expression and all its children have been resolved to a specific schema
-   * and `false` if it still contains any unresolved placeholders. Implementations of expressions
-   * should override this if the resolution of this type of expression involves more than just
-   * the resolution of its children.
+   * and input data types checking passed, and `false` if it still contains any unresolved
+   * placeholders or has data types mismatch.
+   * Implementations of expressions should override this if the resolution of this type of
+   * expression involves more than just the resolution of its children and type checking.
    */
-  lazy val resolved: Boolean = childrenResolved
+  lazy val resolved: Boolean = childrenResolved && checkInputDataTypes().isSuccess
 
   /**
    * Returns the [[DataType]] of the result of evaluating this expression.  It is
@@ -94,12 +95,21 @@ abstract class Expression extends TreeNode[Expression] {
       case (i1, i2) => i1 == i2
     }
   }
+
+  /**
+   * Checks the input data types, returns `TypeCheckResult.success` if it's valid,
+   * or returns a `TypeCheckResult` with an error message if invalid.
+   * Note: it's not valid to call this method until `childrenResolved == true`
+   * TODO: we should remove the default implementation and implement it for all
+   * expressions with proper error message.
+   */
+  def checkInputDataTypes(): TypeCheckResult = TypeCheckResult.TypeCheckSuccess
 }
 
 abstract class BinaryExpression extends Expression with trees.BinaryNode[Expression] {
   self: Product =>
 
-  def symbol: String
+  def symbol: String = sys.error(s"BinaryExpressions must override either toString or symbol")
 
   override def foldable: Boolean = left.foldable && right.foldable
 
@@ -133,7 +143,13 @@ case class GroupExpression(children: Seq[Expression]) extends Expression {
  * so that the proper type conversions can be performed in the analyzer.
  */
 trait ExpectsInputTypes {
+  self: Expression =>
 
   def expectedChildTypes: Seq[DataType]
 
+  override def checkInputDataTypes(): TypeCheckResult = {
+    // We will always do type casting for `ExpectsInputTypes` in `HiveTypeCoercion`,
+    // so type mismatch error won't be reported here, but for underling `Cast`s.
+    TypeCheckResult.TypeCheckSuccess
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index f2299d5..2ac53f8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -17,72 +17,89 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.sql.catalyst.analysis.UnresolvedException
-import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.util.TypeUtils
 import org.apache.spark.sql.types._
 
-case class UnaryMinus(child: Expression) extends UnaryExpression {
+abstract class UnaryArithmetic extends UnaryExpression {
+  self: Product =>
 
-  override def dataType: DataType = child.dataType
   override def foldable: Boolean = child.foldable
   override def nullable: Boolean = child.nullable
-  override def toString: String = s"-$child"
-
-  lazy val numeric = dataType match {
-    case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
-    case other => sys.error(s"Type $other does not support numeric operations")
-  }
+  override def dataType: DataType = child.dataType
 
   override def eval(input: Row): Any = {
     val evalE = child.eval(input)
     if (evalE == null) {
       null
     } else {
-      numeric.negate(evalE)
+      evalInternal(evalE)
     }
   }
+
+  protected def evalInternal(evalE: Any): Any =
+    sys.error(s"UnaryArithmetics must override either eval or evalInternal")
 }
 
-case class Sqrt(child: Expression) extends UnaryExpression {
+case class UnaryMinus(child: Expression) extends UnaryArithmetic {
+  override def toString: String = s"-$child"
+
+  override def checkInputDataTypes(): TypeCheckResult =
+    TypeUtils.checkForNumericExpr(child.dataType, "operator -")
 
+  private lazy val numeric = TypeUtils.getNumeric(dataType)
+
+  protected override def evalInternal(evalE: Any) = numeric.negate(evalE)
+}
+
+case class Sqrt(child: Expression) extends UnaryArithmetic {
   override def dataType: DataType = DoubleType
-  override def foldable: Boolean = child.foldable
   override def nullable: Boolean = true
   override def toString: String = s"SQRT($child)"
 
-  lazy val numeric = child.dataType match {
-    case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
-    case other => sys.error(s"Type $other does not support non-negative numeric operations")
-  }
+  override def checkInputDataTypes(): TypeCheckResult =
+    TypeUtils.checkForNumericExpr(child.dataType, "function sqrt")
 
-  override def eval(input: Row): Any = {
-    val evalE = child.eval(input)
-    if (evalE == null) {
-      null
-    } else {
-      val value = numeric.toDouble(evalE)
-      if (value < 0) null
-      else math.sqrt(value)
-    }
+  private lazy val numeric = TypeUtils.getNumeric(child.dataType)
+
+  protected override def evalInternal(evalE: Any) = {
+    val value = numeric.toDouble(evalE)
+    if (value < 0) null
+    else math.sqrt(value)
   }
 }
 
+/**
+ * A function that get the absolute value of the numeric value.
+ */
+case class Abs(child: Expression) extends UnaryArithmetic {
+  override def toString: String = s"Abs($child)"
+
+  override def checkInputDataTypes(): TypeCheckResult =
+    TypeUtils.checkForNumericExpr(child.dataType, "function abs")
+
+  private lazy val numeric = TypeUtils.getNumeric(dataType)
+
+  protected override def evalInternal(evalE: Any) = numeric.abs(evalE)
+}
+
 abstract class BinaryArithmetic extends BinaryExpression {
   self: Product =>
 
-  override lazy val resolved =
-    left.resolved && right.resolved &&
-    left.dataType == right.dataType &&
-    !DecimalType.isFixed(left.dataType)
-
-  override def dataType: DataType = {
-    if (!resolved) {
-      throw new UnresolvedException(this,
-        s"datatype. Can not resolve due to differing types ${left.dataType}, ${right.dataType}")
+  override def dataType: DataType = left.dataType
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    if (left.dataType != right.dataType) {
+      TypeCheckResult.TypeCheckFailure(
+        s"differing types in ${this.getClass.getSimpleName} " +
+        s"(${left.dataType} and ${right.dataType}).")
+    } else {
+      checkTypesInternal(dataType)
     }
-    left.dataType
   }
 
+  protected def checkTypesInternal(t: DataType): TypeCheckResult
+
   override def eval(input: Row): Any = {
     val evalE1 = left.eval(input)
     if(evalE1 == null) {
@@ -97,88 +114,65 @@ abstract class BinaryArithmetic extends BinaryExpression {
     }
   }
 
-  def evalInternal(evalE1: Any, evalE2: Any): Any =
-    sys.error(s"BinaryExpressions must either override eval or evalInternal")
+  protected def evalInternal(evalE1: Any, evalE2: Any): Any =
+    sys.error(s"BinaryArithmetics must override either eval or evalInternal")
 }
 
 case class Add(left: Expression, right: Expression) extends BinaryArithmetic {
   override def symbol: String = "+"
 
-  lazy val numeric = dataType match {
-    case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
-    case other => sys.error(s"Type $other does not support numeric operations")
-  }
+  override lazy val resolved =
+    childrenResolved && checkInputDataTypes().isSuccess && !DecimalType.isFixed(dataType)
 
-  override def eval(input: Row): Any = {
-    val evalE1 = left.eval(input)
-    if(evalE1 == null) {
-      null
-    } else {
-      val evalE2 = right.eval(input)
-      if (evalE2 == null) {
-        null
-      } else {
-        numeric.plus(evalE1, evalE2)
-      }
-    }
-  }
+  protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForNumericExpr(t, "operator " + symbol)
+
+  private lazy val numeric = TypeUtils.getNumeric(dataType)
+
+  protected override def evalInternal(evalE1: Any, evalE2: Any) = numeric.plus(evalE1, evalE2)
 }
 
 case class Subtract(left: Expression, right: Expression) extends BinaryArithmetic {
   override def symbol: String = "-"
 
-  lazy val numeric = dataType match {
-    case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
-    case other => sys.error(s"Type $other does not support numeric operations")
-  }
+  override lazy val resolved =
+    childrenResolved && checkInputDataTypes().isSuccess && !DecimalType.isFixed(dataType)
 
-  override def eval(input: Row): Any = {
-    val evalE1 = left.eval(input)
-    if(evalE1 == null) {
-      null
-    } else {
-      val evalE2 = right.eval(input)
-      if (evalE2 == null) {
-        null
-      } else {
-        numeric.minus(evalE1, evalE2)
-      }
-    }
-  }
+  protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForNumericExpr(t, "operator " + symbol)
+
+  private lazy val numeric = TypeUtils.getNumeric(dataType)
+
+  protected override def evalInternal(evalE1: Any, evalE2: Any) = numeric.minus(evalE1, evalE2)
 }
 
 case class Multiply(left: Expression, right: Expression) extends BinaryArithmetic {
   override def symbol: String = "*"
 
-  lazy val numeric = dataType match {
-    case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
-    case other => sys.error(s"Type $other does not support numeric operations")
-  }
+  override lazy val resolved =
+    childrenResolved && checkInputDataTypes().isSuccess && !DecimalType.isFixed(dataType)
 
-  override def eval(input: Row): Any = {
-    val evalE1 = left.eval(input)
-    if(evalE1 == null) {
-      null
-    } else {
-      val evalE2 = right.eval(input)
-      if (evalE2 == null) {
-        null
-      } else {
-        numeric.times(evalE1, evalE2)
-      }
-    }
-  }
+  protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForNumericExpr(t, "operator " + symbol)
+
+  private lazy val numeric = TypeUtils.getNumeric(dataType)
+
+  protected override def evalInternal(evalE1: Any, evalE2: Any) = numeric.times(evalE1, evalE2)
 }
 
 case class Divide(left: Expression, right: Expression) extends BinaryArithmetic {
   override def symbol: String = "/"
-
   override def nullable: Boolean = true
 
-  lazy val div: (Any, Any) => Any = dataType match {
+  override lazy val resolved =
+    childrenResolved && checkInputDataTypes().isSuccess && !DecimalType.isFixed(dataType)
+
+  protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForNumericExpr(t, "operator " + symbol)
+
+  private lazy val div: (Any, Any) => Any = dataType match {
     case ft: FractionalType => ft.fractional.asInstanceOf[Fractional[Any]].div
     case it: IntegralType => it.integral.asInstanceOf[Integral[Any]].quot
-    case other => sys.error(s"Type $other does not support numeric operations")
   }
 
   override def eval(input: Row): Any = {
@@ -198,13 +192,17 @@ case class Divide(left: Expression, right: Expression) extends BinaryArithmetic
 
 case class Remainder(left: Expression, right: Expression) extends BinaryArithmetic {
   override def symbol: String = "%"
-
   override def nullable: Boolean = true
 
-  lazy val integral = dataType match {
+  override lazy val resolved =
+    childrenResolved && checkInputDataTypes().isSuccess && !DecimalType.isFixed(dataType)
+
+  protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForNumericExpr(t, "operator " + symbol)
+
+  private lazy val integral = dataType match {
     case i: IntegralType => i.integral.asInstanceOf[Integral[Any]]
     case i: FractionalType => i.asIntegral.asInstanceOf[Integral[Any]]
-    case other => sys.error(s"Type $other does not support numeric operations")
   }
 
   override def eval(input: Row): Any = {
@@ -228,7 +226,10 @@ case class Remainder(left: Expression, right: Expression) extends BinaryArithmet
 case class BitwiseAnd(left: Expression, right: Expression) extends BinaryArithmetic {
   override def symbol: String = "&"
 
-  lazy val and: (Any, Any) => Any = dataType match {
+  protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForBitwiseExpr(t, "operator " + symbol)
+
+  private lazy val and: (Any, Any) => Any = dataType match {
     case ByteType =>
       ((evalE1: Byte, evalE2: Byte) => (evalE1 & evalE2).toByte).asInstanceOf[(Any, Any) => Any]
     case ShortType =>
@@ -237,10 +238,9 @@ case class BitwiseAnd(left: Expression, right: Expression) extends BinaryArithme
       ((evalE1: Int, evalE2: Int) => evalE1 & evalE2).asInstanceOf[(Any, Any) => Any]
     case LongType =>
       ((evalE1: Long, evalE2: Long) => evalE1 & evalE2).asInstanceOf[(Any, Any) => Any]
-    case other => sys.error(s"Unsupported bitwise & operation on $other")
   }
 
-  override def evalInternal(evalE1: Any, evalE2: Any): Any = and(evalE1, evalE2)
+  protected override def evalInternal(evalE1: Any, evalE2: Any) = and(evalE1, evalE2)
 }
 
 /**
@@ -249,7 +249,10 @@ case class BitwiseAnd(left: Expression, right: Expression) extends BinaryArithme
 case class BitwiseOr(left: Expression, right: Expression) extends BinaryArithmetic {
   override def symbol: String = "|"
 
-  lazy val or: (Any, Any) => Any = dataType match {
+  protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForBitwiseExpr(t, "operator " + symbol)
+
+  private lazy val or: (Any, Any) => Any = dataType match {
     case ByteType =>
       ((evalE1: Byte, evalE2: Byte) => (evalE1 | evalE2).toByte).asInstanceOf[(Any, Any) => Any]
     case ShortType =>
@@ -258,10 +261,9 @@ case class BitwiseOr(left: Expression, right: Expression) extends BinaryArithmet
       ((evalE1: Int, evalE2: Int) => evalE1 | evalE2).asInstanceOf[(Any, Any) => Any]
     case LongType =>
       ((evalE1: Long, evalE2: Long) => evalE1 | evalE2).asInstanceOf[(Any, Any) => Any]
-    case other => sys.error(s"Unsupported bitwise | operation on $other")
   }
 
-  override def evalInternal(evalE1: Any, evalE2: Any): Any = or(evalE1, evalE2)
+  protected override def evalInternal(evalE1: Any, evalE2: Any) = or(evalE1, evalE2)
 }
 
 /**
@@ -270,7 +272,10 @@ case class BitwiseOr(left: Expression, right: Expression) extends BinaryArithmet
 case class BitwiseXor(left: Expression, right: Expression) extends BinaryArithmetic {
   override def symbol: String = "^"
 
-  lazy val xor: (Any, Any) => Any = dataType match {
+  protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForBitwiseExpr(t, "operator " + symbol)
+
+  private lazy val xor: (Any, Any) => Any = dataType match {
     case ByteType =>
       ((evalE1: Byte, evalE2: Byte) => (evalE1 ^ evalE2).toByte).asInstanceOf[(Any, Any) => Any]
     case ShortType =>
@@ -279,23 +284,21 @@ case class BitwiseXor(left: Expression, right: Expression) extends BinaryArithme
       ((evalE1: Int, evalE2: Int) => evalE1 ^ evalE2).asInstanceOf[(Any, Any) => Any]
     case LongType =>
       ((evalE1: Long, evalE2: Long) => evalE1 ^ evalE2).asInstanceOf[(Any, Any) => Any]
-    case other => sys.error(s"Unsupported bitwise ^ operation on $other")
   }
 
-  override def evalInternal(evalE1: Any, evalE2: Any): Any = xor(evalE1, evalE2)
+  protected override def evalInternal(evalE1: Any, evalE2: Any): Any = xor(evalE1, evalE2)
 }
 
 /**
  * A function that calculates bitwise not(~) of a number.
  */
-case class BitwiseNot(child: Expression) extends UnaryExpression {
-
-  override def dataType: DataType = child.dataType
-  override def foldable: Boolean = child.foldable
-  override def nullable: Boolean = child.nullable
+case class BitwiseNot(child: Expression) extends UnaryArithmetic {
   override def toString: String = s"~$child"
 
-  lazy val not: (Any) => Any = dataType match {
+  override def checkInputDataTypes(): TypeCheckResult =
+    TypeUtils.checkForBitwiseExpr(child.dataType, "operator ~")
+
+  private lazy val not: (Any) => Any = dataType match {
     case ByteType =>
       ((evalE: Byte) => (~evalE).toByte).asInstanceOf[(Any) => Any]
     case ShortType =>
@@ -304,43 +307,18 @@ case class BitwiseNot(child: Expression) extends UnaryExpression {
       ((evalE: Int) => ~evalE).asInstanceOf[(Any) => Any]
     case LongType =>
       ((evalE: Long) => ~evalE).asInstanceOf[(Any) => Any]
-    case other => sys.error(s"Unsupported bitwise ~ operation on $other")
   }
 
-  override def eval(input: Row): Any = {
-    val evalE = child.eval(input)
-    if (evalE == null) {
-      null
-    } else {
-      not(evalE)
-    }
-  }
+  protected override def evalInternal(evalE: Any) = not(evalE)
 }
 
-case class MaxOf(left: Expression, right: Expression) extends Expression {
-
-  override def foldable: Boolean = left.foldable && right.foldable
-
+case class MaxOf(left: Expression, right: Expression) extends BinaryArithmetic {
   override def nullable: Boolean = left.nullable && right.nullable
 
-  override def children: Seq[Expression] = left :: right :: Nil
-
-  override lazy val resolved =
-    left.resolved && right.resolved &&
-    left.dataType == right.dataType
+  protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForOrderingExpr(t, "function maxOf")
 
-  override def dataType: DataType = {
-    if (!resolved) {
-      throw new UnresolvedException(this,
-        s"datatype. Can not resolve due to differing types ${left.dataType}, ${right.dataType}")
-    }
-    left.dataType
-  }
-
-  lazy val ordering = left.dataType match {
-    case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
-    case other => sys.error(s"Type $other does not support ordered operations")
-  }
+  private lazy val ordering = TypeUtils.getOrdering(dataType)
 
   override def eval(input: Row): Any = {
     val evalE1 = left.eval(input)
@@ -361,30 +339,13 @@ case class MaxOf(left: Expression, right: Expression) extends Expression {
   override def toString: String = s"MaxOf($left, $right)"
 }
 
-case class MinOf(left: Expression, right: Expression) extends Expression {
-
-  override def foldable: Boolean = left.foldable && right.foldable
-
+case class MinOf(left: Expression, right: Expression) extends BinaryArithmetic {
   override def nullable: Boolean = left.nullable && right.nullable
 
-  override def children: Seq[Expression] = left :: right :: Nil
+  protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForOrderingExpr(t, "function minOf")
 
-  override lazy val resolved =
-    left.resolved && right.resolved &&
-    left.dataType == right.dataType
-
-  override def dataType: DataType = {
-    if (!resolved) {
-      throw new UnresolvedException(this,
-        s"datatype. Can not resolve due to differing types ${left.dataType}, ${right.dataType}")
-    }
-    left.dataType
-  }
-
-  lazy val ordering = left.dataType match {
-    case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
-    case other => sys.error(s"Type $other does not support ordered operations")
-  }
+  private lazy val ordering = TypeUtils.getOrdering(dataType)
 
   override def eval(input: Row): Any = {
     val evalE1 = left.eval(input)
@@ -404,28 +365,3 @@ case class MinOf(left: Expression, right: Expression) extends Expression {
 
   override def toString: String = s"MinOf($left, $right)"
 }
-
-/**
- * A function that get the absolute value of the numeric value.
- */
-case class Abs(child: Expression) extends UnaryExpression  {
-
-  override def dataType: DataType = child.dataType
-  override def foldable: Boolean = child.foldable
-  override def nullable: Boolean = child.nullable
-  override def toString: String = s"Abs($child)"
-
-  lazy val numeric = dataType match {
-    case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
-    case other => sys.error(s"Type $other does not support numeric operations")
-  }
-
-  override def eval(input: Row): Any = {
-    val evalE = child.eval(input)
-    if (evalE == null) {
-      null
-    } else {
-      numeric.abs(evalE)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathfuncs/binary.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathfuncs/binary.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathfuncs/binary.scala
index 01f62ba..db853a2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathfuncs/binary.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathfuncs/binary.scala
@@ -29,17 +29,10 @@ import org.apache.spark.sql.types._
 abstract class BinaryMathExpression(f: (Double, Double) => Double, name: String)
   extends BinaryExpression with Serializable with ExpectsInputTypes { self: Product =>
 
-  override def symbol: String = null
   override def expectedChildTypes: Seq[DataType] = Seq(DoubleType, DoubleType)
 
-  override def nullable: Boolean = left.nullable || right.nullable
   override def toString: String = s"$name($left, $right)"
 
-  override lazy val resolved =
-    left.resolved && right.resolved &&
-      left.dataType == right.dataType &&
-      !DecimalType.isFixed(left.dataType)
-
   override def dataType: DataType = DoubleType
 
   override def eval(input: Row): Any = {
@@ -58,9 +51,8 @@ abstract class BinaryMathExpression(f: (Double, Double) => Double, name: String)
   }
 }
 
-case class Atan2(
-    left: Expression,
-    right: Expression) extends BinaryMathExpression(math.atan2, "ATAN2") {
+case class Atan2(left: Expression, right: Expression)
+  extends BinaryMathExpression(math.atan2, "ATAN2") {
 
   override def eval(input: Row): Any = {
     val evalE1 = left.eval(input)
@@ -80,8 +72,7 @@ case class Atan2(
   }
 }
 
-case class Hypot(
-    left: Expression,
-    right: Expression) extends BinaryMathExpression(math.hypot, "HYPOT")
+case class Hypot(left: Expression, right: Expression)
+  extends BinaryMathExpression(math.hypot, "HYPOT")
 
 case class Pow(left: Expression, right: Expression) extends BinaryMathExpression(math.pow, "POWER")

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 4f422d6..807021d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.sql.catalyst.analysis.UnresolvedException
-import org.apache.spark.sql.catalyst.errors.TreeNodeException
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.types.{DataType, BinaryType, BooleanType, AtomicType}
+import org.apache.spark.sql.catalyst.util.TypeUtils
+import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType}
 
 object InterpretedPredicate {
   def create(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
@@ -171,22 +171,51 @@ case class Or(left: Expression, right: Expression)
 
 abstract class BinaryComparison extends BinaryExpression with Predicate {
   self: Product =>
-}
 
-case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
-  override def symbol: String = "="
+  override def checkInputDataTypes(): TypeCheckResult = {
+    if (left.dataType != right.dataType) {
+      TypeCheckResult.TypeCheckFailure(
+        s"differing types in ${this.getClass.getSimpleName} " +
+        s"(${left.dataType} and ${right.dataType}).")
+    } else {
+      checkTypesInternal(dataType)
+    }
+  }
+
+  protected def checkTypesInternal(t: DataType): TypeCheckResult
 
   override def eval(input: Row): Any = {
-    val l = left.eval(input)
-    if (l == null) {
+    val evalE1 = left.eval(input)
+    if (evalE1 == null) {
       null
     } else {
-      val r = right.eval(input)
-      if (r == null) null
-      else if (left.dataType != BinaryType) l == r
-      else java.util.Arrays.equals(l.asInstanceOf[Array[Byte]], r.asInstanceOf[Array[Byte]])
+      val evalE2 = right.eval(input)
+      if (evalE2 == null) {
+        null
+      } else {
+        evalInternal(evalE1, evalE2)
+      }
     }
   }
+
+  protected def evalInternal(evalE1: Any, evalE2: Any): Any =
+    sys.error(s"BinaryComparisons must override either eval or evalInternal")
+}
+
+object BinaryComparison {
+  def unapply(b: BinaryComparison): Option[(Expression, Expression)] =
+    Some((b.left, b.right))
+}
+
+case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
+  override def symbol: String = "="
+
+  override protected def checkTypesInternal(t: DataType) = TypeCheckResult.TypeCheckSuccess
+
+  protected override def evalInternal(l: Any, r: Any) = {
+    if (left.dataType != BinaryType) l == r
+    else java.util.Arrays.equals(l.asInstanceOf[Array[Byte]], r.asInstanceOf[Array[Byte]])
+  }
 }
 
 case class EqualNullSafe(left: Expression, right: Expression) extends BinaryComparison {
@@ -194,6 +223,8 @@ case class EqualNullSafe(left: Expression, right: Expression) extends BinaryComp
 
   override def nullable: Boolean = false
 
+  override protected def checkTypesInternal(t: DataType) = TypeCheckResult.TypeCheckSuccess
+
   override def eval(input: Row): Any = {
     val l = left.eval(input)
     val r = right.eval(input)
@@ -210,117 +241,45 @@ case class EqualNullSafe(left: Expression, right: Expression) extends BinaryComp
 case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
   override def symbol: String = "<"
 
-  lazy val ordering: Ordering[Any] = {
-    if (left.dataType != right.dataType) {
-      throw new TreeNodeException(this,
-        s"Types do not match ${left.dataType} != ${right.dataType}")
-    }
-    left.dataType match {
-      case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
-      case other => sys.error(s"Type $other does not support ordered operations")
-    }
-  }
+  override protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForOrderingExpr(left.dataType, "operator " + symbol)
 
-  override def eval(input: Row): Any = {
-    val evalE1 = left.eval(input)
-    if (evalE1 == null) {
-      null
-    } else {
-      val evalE2 = right.eval(input)
-      if (evalE2 == null) {
-        null
-      } else {
-        ordering.lt(evalE1, evalE2)
-      }
-    }
-  }
+  private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+
+  protected override def evalInternal(evalE1: Any, evalE2: Any) = ordering.lt(evalE1, evalE2)
 }
 
 case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
   override def symbol: String = "<="
 
-  lazy val ordering: Ordering[Any] = {
-    if (left.dataType != right.dataType) {
-      throw new TreeNodeException(this,
-        s"Types do not match ${left.dataType} != ${right.dataType}")
-    }
-    left.dataType match {
-      case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
-      case other => sys.error(s"Type $other does not support ordered operations")
-    }
-  }
+  override protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForOrderingExpr(left.dataType, "operator " + symbol)
 
-  override def eval(input: Row): Any = {
-    val evalE1 = left.eval(input)
-    if (evalE1 == null) {
-      null
-    } else {
-      val evalE2 = right.eval(input)
-      if (evalE2 == null) {
-        null
-      } else {
-        ordering.lteq(evalE1, evalE2)
-      }
-    }
-  }
+  private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+
+  protected override def evalInternal(evalE1: Any, evalE2: Any) = ordering.lteq(evalE1, evalE2)
 }
 
 case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
   override def symbol: String = ">"
 
-  lazy val ordering: Ordering[Any] = {
-    if (left.dataType != right.dataType) {
-      throw new TreeNodeException(this,
-        s"Types do not match ${left.dataType} != ${right.dataType}")
-    }
-    left.dataType match {
-      case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
-      case other => sys.error(s"Type $other does not support ordered operations")
-    }
-  }
+  override protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForOrderingExpr(left.dataType, "operator " + symbol)
 
-  override def eval(input: Row): Any = {
-    val evalE1 = left.eval(input)
-    if(evalE1 == null) {
-      null
-    } else {
-      val evalE2 = right.eval(input)
-      if (evalE2 == null) {
-        null
-      } else {
-        ordering.gt(evalE1, evalE2)
-      }
-    }
-  }
+  private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+
+  protected override def evalInternal(evalE1: Any, evalE2: Any) = ordering.gt(evalE1, evalE2)
 }
 
 case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
   override def symbol: String = ">="
 
-  lazy val ordering: Ordering[Any] = {
-    if (left.dataType != right.dataType) {
-      throw new TreeNodeException(this,
-        s"Types do not match ${left.dataType} != ${right.dataType}")
-    }
-    left.dataType match {
-      case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
-      case other => sys.error(s"Type $other does not support ordered operations")
-    }
-  }
+  override protected def checkTypesInternal(t: DataType) =
+    TypeUtils.checkForOrderingExpr(left.dataType, "operator " + symbol)
 
-  override def eval(input: Row): Any = {
-    val evalE1 = left.eval(input)
-    if (evalE1 == null) {
-      null
-    } else {
-      val evalE2 = right.eval(input)
-      if (evalE2 == null) {
-        null
-      } else {
-        ordering.gteq(evalE1, evalE2)
-      }
-    }
-  }
+  private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+
+  protected override def evalInternal(evalE1: Any, evalE2: Any) = ordering.gteq(evalE1, evalE2)
 }
 
 case class If(predicate: Expression, trueValue: Expression, falseValue: Expression)
@@ -329,16 +288,20 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
   override def children: Seq[Expression] = predicate :: trueValue :: falseValue :: Nil
   override def nullable: Boolean = trueValue.nullable || falseValue.nullable
 
-  override lazy val resolved = childrenResolved && trueValue.dataType == falseValue.dataType
-  override def dataType: DataType = {
-    if (!resolved) {
-      throw new UnresolvedException(
-        this,
-        s"Can not resolve due to differing types ${trueValue.dataType}, ${falseValue.dataType}")
+  override def checkInputDataTypes(): TypeCheckResult = {
+    if (predicate.dataType != BooleanType) {
+      TypeCheckResult.TypeCheckFailure(
+        s"type of predicate expression in If should be boolean, not ${predicate.dataType}")
+    } else if (trueValue.dataType != falseValue.dataType) {
+      TypeCheckResult.TypeCheckFailure(
+        s"differing types in If (${trueValue.dataType} and ${falseValue.dataType}).")
+    } else {
+      TypeCheckResult.TypeCheckSuccess
     }
-    trueValue.dataType
   }
 
+  override def dataType: DataType = trueValue.dataType
+
   override def eval(input: Row): Any = {
     if (true == predicate.eval(input)) {
       trueValue.eval(input)
@@ -364,17 +327,23 @@ trait CaseWhenLike extends Expression {
     branches.sliding(2, 2).collect { case Seq(_, thenExpr) => thenExpr }.toSeq
   val elseValue = if (branches.length % 2 == 0) None else Option(branches.last)
 
-  // both then and else val should be considered.
+  // both then and else expressions should be considered.
   def valueTypes: Seq[DataType] = (thenList ++ elseValue).map(_.dataType)
   def valueTypesEqual: Boolean = valueTypes.distinct.size == 1
 
-  override def dataType: DataType = {
-    if (!resolved) {
-      throw new UnresolvedException(this, "cannot resolve due to differing types in some branches")
+  override def checkInputDataTypes(): TypeCheckResult = {
+    if (valueTypesEqual) {
+      checkTypesInternal()
+    } else {
+      TypeCheckResult.TypeCheckFailure(
+        "THEN and ELSE expressions should all be same type or coercible to a common type")
     }
-    valueTypes.head
   }
 
+  protected def checkTypesInternal(): TypeCheckResult
+
+  override def dataType: DataType = thenList.head.dataType
+
   override def nullable: Boolean = {
     // If no value is nullable and no elseValue is provided, the whole statement defaults to null.
     thenList.exists(_.nullable) || (elseValue.map(_.nullable).getOrElse(true))
@@ -395,10 +364,16 @@ case class CaseWhen(branches: Seq[Expression]) extends CaseWhenLike {
 
   override def children: Seq[Expression] = branches
 
-  override lazy val resolved: Boolean =
-    childrenResolved &&
-    whenList.forall(_.dataType == BooleanType) &&
-    valueTypesEqual
+  override protected def checkTypesInternal(): TypeCheckResult = {
+    if (whenList.forall(_.dataType == BooleanType)) {
+      TypeCheckResult.TypeCheckSuccess
+    } else {
+      val index = whenList.indexWhere(_.dataType != BooleanType)
+      TypeCheckResult.TypeCheckFailure(
+        s"WHEN expressions in CaseWhen should all be boolean type, " +
+        s"but the ${index + 1}th when expression's type is ${whenList(index)}")
+    }
+  }
 
   /** Written in imperative fashion for performance considerations. */
   override def eval(input: Row): Any = {
@@ -441,9 +416,14 @@ case class CaseKeyWhen(key: Expression, branches: Seq[Expression]) extends CaseW
 
   override def children: Seq[Expression] = key +: branches
 
-  override lazy val resolved: Boolean =
-    childrenResolved && valueTypesEqual &&
-    (key +: whenList).map(_.dataType).distinct.size == 1
+  override protected def checkTypesInternal(): TypeCheckResult = {
+    if ((key +: whenList).map(_.dataType).distinct.size > 1) {
+      TypeCheckResult.TypeCheckFailure(
+        "key and WHEN expressions should all be same type or coercible to a common type")
+    } else {
+      TypeCheckResult.TypeCheckSuccess
+    }
+  }
 
   /** Written in imperative fashion for performance considerations. */
   override def eval(input: Row): Any = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b25fb48..5c6379b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -273,6 +273,10 @@ object NullPropagation extends Rule[LogicalPlan] {
       case e @ Substring(_, Literal(null, _), _) => Literal.create(null, e.dataType)
       case e @ Substring(_, _, Literal(null, _)) => Literal.create(null, e.dataType)
 
+      // MaxOf and MinOf can't do null propagation
+      case e: MaxOf => e
+      case e: MinOf => e
+
       // Put exceptional cases above if any
       case e: BinaryArithmetic => e.children match {
         case Literal(null, _) :: right :: Nil => Literal.create(null, e.dataType)

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
index 3f92be4..ad649ac 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
@@ -24,7 +24,7 @@ import java.util.{Calendar, TimeZone}
 import org.apache.spark.sql.catalyst.expressions.Cast
 
 /**
- * helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
+ * Helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
  */
 object DateUtils {
   private val MILLIS_PER_DAY = 86400000

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
new file mode 100644
index 0000000..0bb12d2
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.types._
+
+/**
+ * Helper function to check for valid data types
+ */
+object TypeUtils {
+  def checkForNumericExpr(t: DataType, caller: String): TypeCheckResult = {
+    if (t.isInstanceOf[NumericType] || t == NullType) {
+      TypeCheckResult.TypeCheckSuccess
+    } else {
+      TypeCheckResult.TypeCheckFailure(s"$caller accepts numeric types, not $t")
+    }
+  }
+
+  def checkForBitwiseExpr(t: DataType, caller: String): TypeCheckResult = {
+    if (t.isInstanceOf[IntegralType] || t == NullType) {
+      TypeCheckResult.TypeCheckSuccess
+    } else {
+      TypeCheckResult.TypeCheckFailure(s"$caller accepts integral types, not $t")
+    }
+  }
+
+  def checkForOrderingExpr(t: DataType, caller: String): TypeCheckResult = {
+    if (t.isInstanceOf[AtomicType] || t == NullType) {
+      TypeCheckResult.TypeCheckSuccess
+    } else {
+      TypeCheckResult.TypeCheckFailure(s"$caller accepts non-complex types, not $t")
+    }
+  }
+
+  def getNumeric(t: DataType): Numeric[Any] =
+    t.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]]
+
+  def getOrdering(t: DataType): Ordering[Any] =
+    t.asInstanceOf[AtomicType].ordering.asInstanceOf[Ordering[Any]]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index 1ba3a26..74677dd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -107,7 +107,7 @@ protected[sql] abstract class AtomicType extends DataType {
 abstract class NumericType extends AtomicType {
   // Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
   // implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
-  // type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
+  // type parameter and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
   // desugared by the compiler into an argument to the objects constructor. This means there is no
   // longer an no argument constructor and thus the JVM cannot serialize the object anymore.
   private[sql] val numeric: Numeric[InternalType]

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index 1b8d18d..7bac97b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -92,8 +92,10 @@ class DecimalPrecisionSuite extends SparkFunSuite with BeforeAndAfter {
   }
 
   test("Comparison operations") {
-    checkComparison(LessThan(i, d1), DecimalType.Unlimited)
-    checkComparison(LessThanOrEqual(d1, d2), DecimalType.Unlimited)
+    checkComparison(EqualTo(i, d1), DecimalType(10, 1))
+    checkComparison(EqualNullSafe(d2, d1), DecimalType(5, 2))
+    checkComparison(LessThan(i, d1), DecimalType(10, 1))
+    checkComparison(LessThanOrEqual(d1, d2), DecimalType(5, 2))
     checkComparison(GreaterThan(d2, u), DecimalType.Unlimited)
     checkComparison(GreaterThanOrEqual(d1, f), DoubleType)
     checkComparison(GreaterThan(d2, d2), DecimalType(5, 2))

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala
index a079842..0df4466 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala
@@ -28,11 +28,11 @@ class HiveTypeCoercionSuite extends PlanTest {
 
   test("tightest common bound for types") {
     def widenTest(t1: DataType, t2: DataType, tightestCommon: Option[DataType]) {
-      var found = HiveTypeCoercion.findTightestCommonType(t1, t2)
+      var found = HiveTypeCoercion.findTightestCommonTypeOfTwo(t1, t2)
       assert(found == tightestCommon,
         s"Expected $tightestCommon as tightest common type for $t1 and $t2, found $found")
       // Test both directions to make sure the widening is symmetric.
-      found = HiveTypeCoercion.findTightestCommonType(t2, t1)
+      found = HiveTypeCoercion.findTightestCommonTypeOfTwo(t2, t1)
       assert(found == tightestCommon,
         s"Expected $tightestCommon as tightest common type for $t2 and $t1, found $found")
     }
@@ -140,13 +140,10 @@ class HiveTypeCoercionSuite extends PlanTest {
       CaseKeyWhen(Literal(1.toShort), Seq(Literal(1), Literal("a"))),
       CaseKeyWhen(Cast(Literal(1.toShort), IntegerType), Seq(Literal(1), Literal("a")))
     )
-    // Will remove exception expectation in PR#6405
-    intercept[RuntimeException] {
-      ruleTest(cwc,
-        CaseKeyWhen(Literal(true), Seq(Literal(1), Literal("a"))),
-        CaseKeyWhen(Literal(true), Seq(Literal(1), Literal("a")))
-      )
-    }
+    ruleTest(cwc,
+      CaseKeyWhen(Literal(true), Seq(Literal(1), Literal("a"))),
+      CaseKeyWhen(Literal(true), Seq(Literal(1), Literal("a")))
+    )
   }
 
   test("type coercion simplification for equal to") {

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionTypeCheckingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionTypeCheckingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionTypeCheckingSuite.scala
new file mode 100644
index 0000000..dcb3635
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionTypeCheckingSuite.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types.StringType
+
+class ExpressionTypeCheckingSuite extends SparkFunSuite {
+
+  val testRelation = LocalRelation(
+    'intField.int,
+    'stringField.string,
+    'booleanField.boolean,
+    'complexField.array(StringType))
+
+  def assertError(expr: Expression, errorMessage: String): Unit = {
+    val e = intercept[AnalysisException] {
+      assertSuccess(expr)
+    }
+    assert(e.getMessage.contains(
+      s"cannot resolve '${expr.prettyString}' due to data type mismatch:"))
+    assert(e.getMessage.contains(errorMessage))
+  }
+
+  def assertSuccess(expr: Expression): Unit = {
+    val analyzed = testRelation.select(expr.as("c")).analyze
+    SimpleAnalyzer.checkAnalysis(analyzed)
+  }
+
+  def assertErrorForDifferingTypes(expr: Expression): Unit = {
+    assertError(expr,
+      s"differing types in ${expr.getClass.getSimpleName} (IntegerType and BooleanType).")
+  }
+
+  test("check types for unary arithmetic") {
+    assertError(UnaryMinus('stringField), "operator - accepts numeric type")
+    assertSuccess(Sqrt('stringField)) // We will cast String to Double for sqrt
+    assertError(Sqrt('booleanField), "function sqrt accepts numeric type")
+    assertError(Abs('stringField), "function abs accepts numeric type")
+    assertError(BitwiseNot('stringField), "operator ~ accepts integral type")
+  }
+
+  test("check types for binary arithmetic") {
+    // We will cast String to Double for binary arithmetic
+    assertSuccess(Add('intField, 'stringField))
+    assertSuccess(Subtract('intField, 'stringField))
+    assertSuccess(Multiply('intField, 'stringField))
+    assertSuccess(Divide('intField, 'stringField))
+    assertSuccess(Remainder('intField, 'stringField))
+    // checkAnalysis(BitwiseAnd('intField, 'stringField))
+
+    assertErrorForDifferingTypes(Add('intField, 'booleanField))
+    assertErrorForDifferingTypes(Subtract('intField, 'booleanField))
+    assertErrorForDifferingTypes(Multiply('intField, 'booleanField))
+    assertErrorForDifferingTypes(Divide('intField, 'booleanField))
+    assertErrorForDifferingTypes(Remainder('intField, 'booleanField))
+    assertErrorForDifferingTypes(BitwiseAnd('intField, 'booleanField))
+    assertErrorForDifferingTypes(BitwiseOr('intField, 'booleanField))
+    assertErrorForDifferingTypes(BitwiseXor('intField, 'booleanField))
+    assertErrorForDifferingTypes(MaxOf('intField, 'booleanField))
+    assertErrorForDifferingTypes(MinOf('intField, 'booleanField))
+
+    assertError(Add('booleanField, 'booleanField), "operator + accepts numeric type")
+    assertError(Subtract('booleanField, 'booleanField), "operator - accepts numeric type")
+    assertError(Multiply('booleanField, 'booleanField), "operator * accepts numeric type")
+    assertError(Divide('booleanField, 'booleanField), "operator / accepts numeric type")
+    assertError(Remainder('booleanField, 'booleanField), "operator % accepts numeric type")
+
+    assertError(BitwiseAnd('booleanField, 'booleanField), "operator & accepts integral type")
+    assertError(BitwiseOr('booleanField, 'booleanField), "operator | accepts integral type")
+    assertError(BitwiseXor('booleanField, 'booleanField), "operator ^ accepts integral type")
+
+    assertError(MaxOf('complexField, 'complexField), "function maxOf accepts non-complex type")
+    assertError(MinOf('complexField, 'complexField), "function minOf accepts non-complex type")
+  }
+
+  test("check types for predicates") {
+    // We will cast String to Double for binary comparison
+    assertSuccess(EqualTo('intField, 'stringField))
+    assertSuccess(EqualNullSafe('intField, 'stringField))
+    assertSuccess(LessThan('intField, 'stringField))
+    assertSuccess(LessThanOrEqual('intField, 'stringField))
+    assertSuccess(GreaterThan('intField, 'stringField))
+    assertSuccess(GreaterThanOrEqual('intField, 'stringField))
+
+    // We will transform EqualTo with numeric and boolean types to CaseKeyWhen
+    assertSuccess(EqualTo('intField, 'booleanField))
+    assertSuccess(EqualNullSafe('intField, 'booleanField))
+
+    assertError(EqualTo('intField, 'complexField), "differing types")
+    assertError(EqualNullSafe('intField, 'complexField), "differing types")
+
+    assertErrorForDifferingTypes(LessThan('intField, 'booleanField))
+    assertErrorForDifferingTypes(LessThanOrEqual('intField, 'booleanField))
+    assertErrorForDifferingTypes(GreaterThan('intField, 'booleanField))
+    assertErrorForDifferingTypes(GreaterThanOrEqual('intField, 'booleanField))
+
+    assertError(
+      LessThan('complexField, 'complexField), "operator < accepts non-complex type")
+    assertError(
+      LessThanOrEqual('complexField, 'complexField), "operator <= accepts non-complex type")
+    assertError(
+      GreaterThan('complexField, 'complexField), "operator > accepts non-complex type")
+    assertError(
+      GreaterThanOrEqual('complexField, 'complexField), "operator >= accepts non-complex type")
+
+    assertError(
+      If('intField, 'stringField, 'stringField),
+      "type of predicate expression in If should be boolean")
+    assertErrorForDifferingTypes(If('booleanField, 'intField, 'booleanField))
+
+    assertError(
+      CaseWhen(Seq('booleanField, 'intField, 'booleanField, 'complexField)),
+      "THEN and ELSE expressions should all be same type or coercible to a common type")
+    assertError(
+      CaseKeyWhen('intField, Seq('intField, 'stringField, 'intField, 'complexField)),
+      "THEN and ELSE expressions should all be same type or coercible to a common type")
+    assertError(
+      CaseWhen(Seq('booleanField, 'intField, 'intField, 'intField)),
+      "WHEN expressions in CaseWhen should all be boolean type")
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
index 06aa19e..565d102 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
@@ -147,7 +147,7 @@ private[sql] object InferSchema {
    * Returns the most general data type for two given data types.
    */
   private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
-    HiveTypeCoercion.findTightestCommonType(t1, t2).getOrElse {
+    HiveTypeCoercion.findTightestCommonTypeOfTwo(t1, t2).getOrElse {
       // t1 or t2 is a StructType, ArrayType, or an unexpected type.
       (t1, t2) match {
         case (other: DataType, NullType) => other

http://git-wip-us.apache.org/repos/asf/spark/blob/d38cf217/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 95eb117..7e1e21f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -155,7 +155,7 @@ private[sql] object JsonRDD extends Logging {
    * Returns the most general data type for two given data types.
    */
   private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
-    HiveTypeCoercion.findTightestCommonType(t1, t2) match {
+    HiveTypeCoercion.findTightestCommonTypeOfTwo(t1, t2) match {
       case Some(commonType) => commonType
       case None =>
         // t1 or t2 is a StructType, ArrayType, or an unexpected type.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org