You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/03/21 02:04:50 UTC

[7/9] SPARK-1251 Support for optimizing and executing structured queries

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala
new file mode 100644
index 0000000..c253587
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+import trees._
+
+/**
+ * Functions for attaching and retrieving trees that are associated with errors.
+ */
+package object errors {
+
+  class TreeNodeException[TreeType <: TreeNode[_]]
+    (tree: TreeType, msg: String, cause: Throwable) extends Exception(msg, cause) {
+
+    // Yes, this is the same as a default parameter, but... those don't seem to work with SBT
+    // external project dependencies for some reason.
+    def this(tree: TreeType, msg: String) = this(tree, msg, null)
+
+    override def getMessage: String = {
+      val treeString = tree.toString
+      s"${super.getMessage}, tree:${if (treeString contains "\n") "\n" else " "}$tree"
+    }
+  }
+
+  /**
+   *  Wraps any exceptions that are thrown while executing `f` in a
+   *  [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`.
+   */
+  def attachTree[TreeType <: TreeNode[_], A](tree: TreeType, msg: String = "")(f: => A): A = {
+    try f catch {
+      case e: Exception => throw new TreeNodeException(tree, msg, e)
+    }
+  }
+
+  /**
+   * Executes `f` which is expected to throw a
+   * [[catalyst.errors.TreeNodeException TreeNodeException]]. The first tree encountered in
+   * the stack of exceptions of type `TreeType` is returned.
+   */
+  def getTree[TreeType <: TreeNode[_]](f: => Unit): TreeType = ??? // TODO: Implement
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
new file mode 100644
index 0000000..3b6bac1
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import rules._
+import errors._
+
+import catalyst.plans.QueryPlan
+
+/**
+ * A bound reference points to a specific slot in the input tuple, allowing the actual value
+ * to be retrieved more efficiently.  However, since operations like column pruning can change
+ * the layout of intermediate tuples, BindReferences should be run after all such transformations.
+ */
+case class BoundReference(ordinal: Int, baseReference: Attribute)
+  extends Attribute with trees.LeafNode[Expression] {
+
+  type EvaluatedType = Any
+
+  def nullable = baseReference.nullable
+  def dataType = baseReference.dataType
+  def exprId = baseReference.exprId
+  def qualifiers = baseReference.qualifiers
+  def name = baseReference.name
+
+  def newInstance = BoundReference(ordinal, baseReference.newInstance)
+  def withQualifiers(newQualifiers: Seq[String]) =
+    BoundReference(ordinal, baseReference.withQualifiers(newQualifiers))
+
+  override def toString = s"$baseReference:$ordinal"
+
+  override def apply(input: Row): Any = input(ordinal)
+}
+
+class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] {
+  import BindReferences._
+
+  def apply(plan: TreeNode): TreeNode = {
+    plan.transform {
+      case leafNode if leafNode.children.isEmpty => leafNode
+      case unaryNode if unaryNode.children.size == 1 => unaryNode.transformExpressions { case e =>
+        bindReference(e, unaryNode.children.head.output)
+      }
+    }
+  }
+}
+
+object BindReferences extends Logging {
+  def bindReference(expression: Expression, input: Seq[Attribute]): Expression = {
+    expression.transform { case a: AttributeReference =>
+      attachTree(a, "Binding attribute") {
+        val ordinal = input.indexWhere(_.exprId == a.exprId)
+        if (ordinal == -1) {
+          // TODO: This fallback is required because some operators (such as ScriptTransform)
+          // produce new attributes that can't be bound.  Likely the right thing to do is remove
+          // this rule and require all operators to explicitly bind to the input schema that
+          // they specify.
+          logger.debug(s"Couldn't find $a in ${input.mkString("[", ",", "]")}")
+          a
+        } else {
+          BoundReference(ordinal, a)
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
new file mode 100644
index 0000000..608656d
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types._
+
+/** Cast the child expression to the target data type. */
+case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
+  override def foldable = child.foldable
+  def nullable = child.nullable
+  override def toString = s"CAST($child, $dataType)"
+
+  type EvaluatedType = Any
+
+  lazy val castingFunction: Any => Any = (child.dataType, dataType) match {
+    case (BinaryType, StringType) => a: Any => new String(a.asInstanceOf[Array[Byte]])
+    case (StringType, BinaryType) => a: Any => a.asInstanceOf[String].getBytes
+    case (_, StringType) => a: Any => a.toString
+    case (StringType, IntegerType) => a: Any => castOrNull(a, _.toInt)
+    case (StringType, DoubleType) => a: Any => castOrNull(a, _.toDouble)
+    case (StringType, FloatType) => a: Any => castOrNull(a, _.toFloat)
+    case (StringType, LongType) => a: Any => castOrNull(a, _.toLong)
+    case (StringType, ShortType) => a: Any => castOrNull(a, _.toShort)
+    case (StringType, ByteType) => a: Any => castOrNull(a, _.toByte)
+    case (StringType, DecimalType) => a: Any => castOrNull(a, BigDecimal(_))
+    case (BooleanType, ByteType) => a: Any => a match {
+      case null => null
+      case true => 1.toByte
+      case false => 0.toByte
+    }
+    case (dt, IntegerType) =>
+      a: Any => dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toInt(a)
+    case (dt, DoubleType) =>
+      a: Any => dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toDouble(a)
+    case (dt, FloatType) =>
+      a: Any => dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toFloat(a)
+    case (dt, LongType) =>
+      a: Any => dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toLong(a)
+    case (dt, ShortType) =>
+      a: Any => dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toInt(a).toShort
+    case (dt, ByteType) =>
+      a: Any => dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toInt(a).toByte
+    case (dt, DecimalType) =>
+      a: Any =>
+        BigDecimal(dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toDouble(a))
+  }
+
+  @inline
+  protected def castOrNull[A](a: Any, f: String => A) =
+    try f(a.asInstanceOf[String]) catch {
+      case _: java.lang.NumberFormatException => null
+    }
+
+  override def apply(input: Row): Any = {
+    val evaluated = child.apply(input)
+    if (evaluated == null) {
+      null
+    } else {
+      castingFunction(evaluated)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
new file mode 100644
index 0000000..78aaaee
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import errors._
+import trees._
+import types._
+
+abstract class Expression extends TreeNode[Expression] {
+  self: Product =>
+
+  /** The narrowest possible type that is produced when this expression is evaluated. */
+  type EvaluatedType <: Any
+
+  def dataType: DataType
+
+  /**
+   * Returns true when an expression is a candidate for static evaluation before the query is
+   * executed.
+   *
+   * The following conditions are used to determine suitability for constant folding:
+   *  - A [[expressions.Coalesce Coalesce]] is foldable if all of its children are foldable
+   *  - A [[expressions.BinaryExpression BinaryExpression]] is foldable if its both left and right
+   *    child are foldable
+   *  - A [[expressions.Not Not]], [[expressions.IsNull IsNull]], or
+   *    [[expressions.IsNotNull IsNotNull]] is foldable if its child is foldable.
+   *  - A [[expressions.Literal]] is foldable.
+   *  - A [[expressions.Cast Cast]] or [[expressions.UnaryMinus UnaryMinus]] is foldable if its
+   *    child is foldable.
+   */
+  // TODO: Supporting more foldable expressions. For example, deterministic Hive UDFs.
+  def foldable: Boolean = false
+  def nullable: Boolean
+  def references: Set[Attribute]
+
+  /** Returns the result of evaluating this expression on a given input Row */
+  def apply(input: Row = null): EvaluatedType =
+    throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
+
+  /**
+   * Returns `true` if this expression and all its children have been resolved to a specific schema
+   * and `false` if it is still contains any unresolved placeholders. Implementations of expressions
+   * should override this if the resolution of this type of expression involves more than just
+   * the resolution of its children.
+   */
+  lazy val resolved: Boolean = childrenResolved
+
+  /**
+   * Returns true if  all the children of this expression have been resolved to a specific schema
+   * and false if any still contains any unresolved placeholders.
+   */
+  def childrenResolved = !children.exists(!_.resolved)
+
+  /**
+   * A set of helper functions that return the correct descendant of [[scala.math.Numeric]] type
+   * and do any casting necessary of child evaluation.
+   */
+  @inline
+  def n1(e: Expression, i: Row, f: ((Numeric[Any], Any) => Any)): Any  = {
+    val evalE = e.apply(i)
+    if (evalE == null) {
+      null
+    } else {
+      e.dataType match {
+        case n: NumericType =>
+          val castedFunction = f.asInstanceOf[(Numeric[n.JvmType], n.JvmType) => n.JvmType]
+          castedFunction(n.numeric, evalE.asInstanceOf[n.JvmType])
+        case other => sys.error(s"Type $other does not support numeric operations")
+      }
+    }
+  }
+
+  @inline
+  protected final def n2(
+      i: Row,
+      e1: Expression,
+      e2: Expression,
+      f: ((Numeric[Any], Any, Any) => Any)): Any  = {
+
+    if (e1.dataType != e2.dataType) {
+      throw new TreeNodeException(this,  s"Types do not match ${e1.dataType} != ${e2.dataType}")
+    }
+
+    val evalE1 = e1.apply(i)
+    if(evalE1 == null) {
+      null
+    } else {
+      val evalE2 = e2.apply(i)
+      if (evalE2 == null) {
+        null
+      } else {
+        e1.dataType match {
+          case n: NumericType =>
+            f.asInstanceOf[(Numeric[n.JvmType], n.JvmType, n.JvmType) => Int](
+              n.numeric, evalE1.asInstanceOf[n.JvmType], evalE2.asInstanceOf[n.JvmType])
+          case other => sys.error(s"Type $other does not support numeric operations")
+        }
+      }
+    }
+  }
+
+  @inline
+  protected final def f2(
+      i: Row,
+      e1: Expression,
+      e2: Expression,
+      f: ((Fractional[Any], Any, Any) => Any)): Any  = {
+    if (e1.dataType != e2.dataType) {
+      throw new TreeNodeException(this,  s"Types do not match ${e1.dataType} != ${e2.dataType}")
+    }
+
+    val evalE1 = e1.apply(i: Row)
+    if(evalE1 == null) {
+      null
+    } else {
+      val evalE2 = e2.apply(i: Row)
+      if (evalE2 == null) {
+        null
+      } else {
+        e1.dataType match {
+          case ft: FractionalType =>
+            f.asInstanceOf[(Fractional[ft.JvmType], ft.JvmType, ft.JvmType) => ft.JvmType](
+              ft.fractional, evalE1.asInstanceOf[ft.JvmType], evalE2.asInstanceOf[ft.JvmType])
+          case other => sys.error(s"Type $other does not support fractional operations")
+        }
+      }
+    }
+  }
+
+  @inline
+  protected final def i2(
+      i: Row,
+      e1: Expression,
+      e2: Expression,
+      f: ((Integral[Any], Any, Any) => Any)): Any  = {
+    if (e1.dataType != e2.dataType) {
+      throw new TreeNodeException(this,  s"Types do not match ${e1.dataType} != ${e2.dataType}")
+    }
+
+    val evalE1 = e1.apply(i)
+    if(evalE1 == null) {
+      null
+    } else {
+      val evalE2 = e2.apply(i)
+      if (evalE2 == null) {
+        null
+      } else {
+        e1.dataType match {
+          case i: IntegralType =>
+            f.asInstanceOf[(Integral[i.JvmType], i.JvmType, i.JvmType) => i.JvmType](
+              i.integral, evalE1.asInstanceOf[i.JvmType], evalE2.asInstanceOf[i.JvmType])
+          case other => sys.error(s"Type $other does not support numeric operations")
+        }
+      }
+    }
+  }
+}
+
+abstract class BinaryExpression extends Expression with trees.BinaryNode[Expression] {
+  self: Product =>
+
+  def symbol: String
+
+  override def foldable = left.foldable && right.foldable
+
+  def references = left.references ++ right.references
+
+  override def toString = s"($left $symbol $right)"
+}
+
+abstract class LeafExpression extends Expression with trees.LeafNode[Expression] {
+  self: Product =>
+}
+
+abstract class UnaryExpression extends Expression with trees.UnaryNode[Expression] {
+  self: Product =>
+
+  def references = child.references
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
new file mode 100644
index 0000000..8c407d2
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+package expressions
+
+/**
+ * Converts a [[Row]] to another Row given a sequence of expression that define each column of the
+ * new row. If the schema of the input row is specified, then the given expression will be bound to
+ * that schema.
+ */
+class Projection(expressions: Seq[Expression]) extends (Row => Row) {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+    this(expressions.map(BindReferences.bindReference(_, inputSchema)))
+
+  protected val exprArray = expressions.toArray
+  def apply(input: Row): Row = {
+    val outputArray = new Array[Any](exprArray.size)
+    var i = 0
+    while (i < exprArray.size) {
+      outputArray(i) = exprArray(i).apply(input)
+      i += 1
+    }
+    new GenericRow(outputArray)
+  }
+}
+
+/**
+ * Converts a [[Row]] to another Row given a sequence of expression that define each column of th
+ * new row. If the schema of the input row is specified, then the given expression will be bound to
+ * that schema.
+ *
+ * In contrast to a normal projection, a MutableProjection reuses the same underlying row object
+ * each time an input row is added.  This significatly reduces the cost of calcuating the
+ * projection, but means that it is not safe
+ */
+case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+    this(expressions.map(BindReferences.bindReference(_, inputSchema)))
+
+  private[this] val exprArray = expressions.toArray
+  private[this] val mutableRow = new GenericMutableRow(exprArray.size)
+  def currentValue: Row = mutableRow
+
+  def apply(input: Row): Row = {
+    var i = 0
+    while (i < exprArray.size) {
+      mutableRow(i) = exprArray(i).apply(input)
+      i += 1
+    }
+    mutableRow
+  }
+}
+
+/**
+ * A mutable wrapper that makes two rows appear appear as a single concatenated row.  Designed to
+ * be instantiated once per thread and reused.
+ */
+class JoinedRow extends Row {
+  private[this] var row1: Row = _
+  private[this] var row2: Row = _
+
+  /** Updates this JoinedRow to used point at two new base rows.  Returns itself. */
+  def apply(r1: Row, r2: Row): Row = {
+    row1 = r1
+    row2 = r2
+    this
+  }
+
+  def iterator = row1.iterator ++ row2.iterator
+
+  def length = row1.length + row2.length
+
+  def apply(i: Int) =
+    if (i < row1.size) row1(i) else row2(i - row1.size)
+
+  def isNullAt(i: Int) = apply(i) == null
+
+  def getInt(i: Int): Int =
+    if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)
+
+  def getLong(i: Int): Long =
+    if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)
+
+  def getDouble(i: Int): Double =
+    if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)
+
+  def getBoolean(i: Int): Boolean =
+    if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)
+
+  def getShort(i: Int): Short =
+    if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)
+
+  def getByte(i: Int): Byte =
+    if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)
+
+  def getFloat(i: Int): Float =
+    if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)
+
+  def getString(i: Int): String =
+    if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)
+
+  def copy() = {
+    val totalSize = row1.size + row2.size
+    val copiedValues = new Array[Any](totalSize)
+    var i = 0
+    while(i < totalSize) {
+      copiedValues(i) = apply(i)
+      i += 1
+    }
+    new GenericRow(copiedValues)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Rand.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Rand.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Rand.scala
new file mode 100644
index 0000000..a5d0ecf
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Rand.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types.DoubleType
+
+case object Rand extends LeafExpression {
+  def dataType = DoubleType
+  def nullable = false
+  def references = Set.empty
+  override def toString = "RAND()"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
new file mode 100644
index 0000000..3529675
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types._
+
+/**
+ * Represents one row of output from a relational operator.  Allows both generic access by ordinal,
+ * which will incur boxing overhead for primitives, as well as native primitive access.
+ *
+ * It is invalid to use the native primitive interface to retrieve a value that is null, instead a
+ * user must check [[isNullAt]] before attempting to retrieve a value that might be null.
+ */
+trait Row extends Seq[Any] with Serializable {
+  def apply(i: Int): Any
+
+  def isNullAt(i: Int): Boolean
+
+  def getInt(i: Int): Int
+  def getLong(i: Int): Long
+  def getDouble(i: Int): Double
+  def getFloat(i: Int): Float
+  def getBoolean(i: Int): Boolean
+  def getShort(i: Int): Short
+  def getByte(i: Int): Byte
+  def getString(i: Int): String
+
+  override def toString() =
+    s"[${this.mkString(",")}]"
+
+  def copy(): Row
+}
+
+/**
+ * An extended interface to [[Row]] that allows the values for each column to be updated.  Setting
+ * a value through a primitive function implicitly marks that column as not null.
+ */
+trait MutableRow extends Row {
+  def setNullAt(i: Int): Unit
+
+  def update(ordinal: Int, value: Any)
+
+  def setInt(ordinal: Int, value: Int)
+  def setLong(ordinal: Int, value: Long)
+  def setDouble(ordinal: Int, value: Double)
+  def setBoolean(ordinal: Int, value: Boolean)
+  def setShort(ordinal: Int, value: Short)
+  def setByte(ordinal: Int, value: Byte)
+  def setFloat(ordinal: Int, value: Float)
+  def setString(ordinal: Int, value: String)
+
+  /**
+   * EXPERIMENTAL
+   *
+   * Returns a mutable string builder for the specified column.  A given row should return the
+   * result of any mutations made to the returned buffer next time getString is called for the same
+   * column.
+   */
+  def getStringBuilder(ordinal: Int): StringBuilder
+}
+
+/**
+ * A row with no data.  Calling any methods will result in an error.  Can be used as a placeholder.
+ */
+object EmptyRow extends Row {
+  def apply(i: Int): Any = throw new UnsupportedOperationException
+
+  def iterator = Iterator.empty
+  def length = 0
+  def isNullAt(i: Int): Boolean = throw new UnsupportedOperationException
+
+  def getInt(i: Int): Int = throw new UnsupportedOperationException
+  def getLong(i: Int): Long = throw new UnsupportedOperationException
+  def getDouble(i: Int): Double = throw new UnsupportedOperationException
+  def getFloat(i: Int): Float = throw new UnsupportedOperationException
+  def getBoolean(i: Int): Boolean = throw new UnsupportedOperationException
+  def getShort(i: Int): Short = throw new UnsupportedOperationException
+  def getByte(i: Int): Byte = throw new UnsupportedOperationException
+  def getString(i: Int): String = throw new UnsupportedOperationException
+
+  def copy() = this
+}
+
+/**
+ * A row implementation that uses an array of objects as the underlying storage.  Note that, while
+ * the array is not copied, and thus could technically be mutated after creation, this is not
+ * allowed.
+ */
+class GenericRow(protected[catalyst] val values: Array[Any]) extends Row {
+  /** No-arg constructor for serialization. */
+  def this() = this(null)
+
+  def this(size: Int) = this(new Array[Any](size))
+
+  def iterator = values.iterator
+
+  def length = values.length
+
+  def apply(i: Int) = values(i)
+
+  def isNullAt(i: Int) = values(i) == null
+
+  def getInt(i: Int): Int = {
+    if (values(i) == null) sys.error("Failed to check null bit for primitive int value.")
+    values(i).asInstanceOf[Int]
+  }
+
+  def getLong(i: Int): Long = {
+    if (values(i) == null) sys.error("Failed to check null bit for primitive long value.")
+    values(i).asInstanceOf[Long]
+  }
+
+  def getDouble(i: Int): Double = {
+    if (values(i) == null) sys.error("Failed to check null bit for primitive double value.")
+    values(i).asInstanceOf[Double]
+  }
+
+  def getFloat(i: Int): Float = {
+    if (values(i) == null) sys.error("Failed to check null bit for primitive float value.")
+    values(i).asInstanceOf[Float]
+  }
+
+  def getBoolean(i: Int): Boolean = {
+    if (values(i) == null) sys.error("Failed to check null bit for primitive boolean value.")
+    values(i).asInstanceOf[Boolean]
+  }
+
+  def getShort(i: Int): Short = {
+    if (values(i) == null) sys.error("Failed to check null bit for primitive short value.")
+    values(i).asInstanceOf[Short]
+  }
+
+  def getByte(i: Int): Byte = {
+    if (values(i) == null) sys.error("Failed to check null bit for primitive byte value.")
+    values(i).asInstanceOf[Byte]
+  }
+
+  def getString(i: Int): String = {
+    if (values(i) == null) sys.error("Failed to check null bit for primitive String value.")
+    values(i).asInstanceOf[String]
+  }
+
+  def copy() = this
+}
+
+class GenericMutableRow(size: Int) extends GenericRow(size) with MutableRow {
+  /** No-arg constructor for serialization. */
+  def this() = this(0)
+
+  def getStringBuilder(ordinal: Int): StringBuilder = ???
+
+  override def setBoolean(ordinal: Int,value: Boolean): Unit = { values(ordinal) = value }
+  override def setByte(ordinal: Int,value: Byte): Unit = { values(ordinal) = value }
+  override def setDouble(ordinal: Int,value: Double): Unit = { values(ordinal) = value }
+  override def setFloat(ordinal: Int,value: Float): Unit = { values(ordinal) = value }
+  override def setInt(ordinal: Int,value: Int): Unit = { values(ordinal) = value }
+  override def setLong(ordinal: Int,value: Long): Unit = { values(ordinal) = value }
+  override def setString(ordinal: Int,value: String): Unit = { values(ordinal) = value }
+
+  override def setNullAt(i: Int): Unit = { values(i) = null }
+
+  override def setShort(ordinal: Int,value: Short): Unit = { values(ordinal) = value }
+
+  override def update(ordinal: Int,value: Any): Unit = { values(ordinal) = value }
+
+  override def copy() = new GenericRow(values.clone())
+}
+
+
+class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
+  def compare(a: Row, b: Row): Int = {
+    var i = 0
+    while (i < ordering.size) {
+      val order = ordering(i)
+      val left = order.child.apply(a)
+      val right = order.child.apply(b)
+
+      if (left == null && right == null) {
+        // Both null, continue looking.
+      } else if (left == null) {
+        return if (order.direction == Ascending) -1 else 1
+      } else if (right == null) {
+        return if (order.direction == Ascending) 1 else -1
+      } else {
+        val comparison = order.dataType match {
+          case n: NativeType if order.direction == Ascending =>
+            n.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
+          case n: NativeType if order.direction == Descending =>
+            n.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
+        }
+        if (comparison != 0) return comparison
+      }
+      i += 1
+    }
+    return 0
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
new file mode 100644
index 0000000..a3c7ca1
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types._
+
+case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression])
+  extends Expression {
+
+  type EvaluatedType = Any
+
+  def references = children.flatMap(_.references).toSet
+  def nullable = true
+
+  override def apply(input: Row): Any = {
+    children.size match {
+      case 1 => function.asInstanceOf[(Any) => Any](children(0).apply(input))
+      case 2 =>
+        function.asInstanceOf[(Any, Any) => Any](
+          children(0).apply(input),
+          children(1).apply(input))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
new file mode 100644
index 0000000..171997b
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+abstract sealed class SortDirection
+case object Ascending extends SortDirection
+case object Descending extends SortDirection
+
+/**
+ * An expression that can be used to sort a tuple.  This class extends expression primarily so that
+ * transformations over expression will descend into its child.
+ */
+case class SortOrder(child: Expression, direction: SortDirection) extends UnaryExpression {
+  def dataType = child.dataType
+  def nullable = child.nullable
+  override def toString = s"$child ${if (direction == Ascending) "ASC" else "DESC"}"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala
new file mode 100644
index 0000000..2ad8d6f
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import scala.language.dynamics
+
+import types._
+
+case object DynamicType extends DataType
+
+case class WrapDynamic(children: Seq[Attribute]) extends Expression {
+  type EvaluatedType = DynamicRow
+
+  def nullable = false
+  def references = children.toSet
+  def dataType = DynamicType
+
+  override def apply(input: Row): DynamicRow = input match {
+    // Avoid copy for generic rows.
+    case g: GenericRow => new DynamicRow(children, g.values)
+    case otherRowType => new DynamicRow(children, otherRowType.toArray)
+  }
+}
+
+class DynamicRow(val schema: Seq[Attribute], values: Array[Any])
+  extends GenericRow(values) with Dynamic {
+
+  def selectDynamic(attributeName: String): String = {
+    val ordinal = schema.indexWhere(_.name == attributeName)
+    values(ordinal).toString
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
new file mode 100644
index 0000000..2287a84
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import catalyst.types._
+
+abstract class AggregateExpression extends Expression {
+  self: Product =>
+
+  /**
+   * Creates a new instance that can be used to compute this aggregate expression for a group
+   * of input rows/
+   */
+  def newInstance: AggregateFunction
+}
+
+/**
+ * Represents an aggregation that has been rewritten to be performed in two steps.
+ *
+ * @param finalEvaluation an aggregate expression that evaluates to same final result as the
+ *                        original aggregation.
+ * @param partialEvaluations A sequence of [[NamedExpression]]s that can be computed on partial
+ *                           data sets and are required to compute the `finalEvaluation`.
+ */
+case class SplitEvaluation(
+    finalEvaluation: Expression,
+    partialEvaluations: Seq[NamedExpression])
+
+/**
+ * An [[AggregateExpression]] that can be partially computed without seeing all relevent tuples.
+ * These partial evaluations can then be combined to compute the actual answer.
+ */
+abstract class PartialAggregate extends AggregateExpression {
+  self: Product =>
+
+  /**
+   * Returns a [[SplitEvaluation]] that computes this aggregation using partial aggregation.
+   */
+  def asPartial: SplitEvaluation
+}
+
+/**
+ * A specific implementation of an aggregate function. Used to wrap a generic
+ * [[AggregateExpression]] with an algorithm that will be used to compute one specific result.
+ */
+abstract class AggregateFunction
+  extends AggregateExpression with Serializable with trees.LeafNode[Expression] {
+  self: Product =>
+
+  type EvaluatedType = Any
+
+  /** Base should return the generic aggregate expression that this function is computing */
+  val base: AggregateExpression
+  def references = base.references
+  def nullable = base.nullable
+  def dataType = base.dataType
+
+  def update(input: Row): Unit
+  override def apply(input: Row): Any
+
+  // Do we really need this?
+  def newInstance = makeCopy(productIterator.map { case a: AnyRef => a }.toArray)
+}
+
+case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
+  def references = child.references
+  def nullable = false
+  def dataType = IntegerType
+  override def toString = s"COUNT($child)"
+
+  def asPartial: SplitEvaluation = {
+    val partialCount = Alias(Count(child), "PartialCount")()
+    SplitEvaluation(Sum(partialCount.toAttribute), partialCount :: Nil)
+  }
+
+  override def newInstance = new CountFunction(child, this)
+}
+
+case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpression {
+  def children = expressions
+  def references = expressions.flatMap(_.references).toSet
+  def nullable = false
+  def dataType = IntegerType
+  override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")}})"
+  override def newInstance = new CountDistinctFunction(expressions, this)
+}
+
+case class Average(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
+  def references = child.references
+  def nullable = false
+  def dataType = DoubleType
+  override def toString = s"AVG($child)"
+
+  override def asPartial: SplitEvaluation = {
+    val partialSum = Alias(Sum(child), "PartialSum")()
+    val partialCount = Alias(Count(child), "PartialCount")()
+    val castedSum = Cast(Sum(partialSum.toAttribute), dataType)
+    val castedCount = Cast(Sum(partialCount.toAttribute), dataType)
+
+    SplitEvaluation(
+      Divide(castedSum, castedCount),
+      partialCount :: partialSum :: Nil)
+  }
+
+  override def newInstance = new AverageFunction(child, this)
+}
+
+case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
+  def references = child.references
+  def nullable = false
+  def dataType = child.dataType
+  override def toString = s"SUM($child)"
+
+  override def asPartial: SplitEvaluation = {
+    val partialSum = Alias(Sum(child), "PartialSum")()
+    SplitEvaluation(
+      Sum(partialSum.toAttribute),
+      partialSum :: Nil)
+  }
+
+  override def newInstance = new SumFunction(child, this)
+}
+
+case class SumDistinct(child: Expression)
+  extends AggregateExpression with trees.UnaryNode[Expression] {
+
+  def references = child.references
+  def nullable = false
+  def dataType = child.dataType
+  override def toString = s"SUM(DISTINCT $child)"
+
+  override def newInstance = new SumDistinctFunction(child, this)
+}
+
+case class First(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
+  def references = child.references
+  def nullable = child.nullable
+  def dataType = child.dataType
+  override def toString = s"FIRST($child)"
+
+  override def asPartial: SplitEvaluation = {
+    val partialFirst = Alias(First(child), "PartialFirst")()
+    SplitEvaluation(
+      First(partialFirst.toAttribute),
+      partialFirst :: Nil)
+  }
+  override def newInstance = new FirstFunction(child, this)
+}
+
+case class AverageFunction(expr: Expression, base: AggregateExpression)
+  extends AggregateFunction {
+
+  def this() = this(null, null) // Required for serialization.
+
+  private var count: Long = _
+  private val sum = MutableLiteral(Cast(Literal(0), expr.dataType).apply(EmptyRow))
+  private val sumAsDouble = Cast(sum, DoubleType)
+
+
+
+  private val addFunction = Add(sum, expr)
+
+  override def apply(input: Row): Any =
+    sumAsDouble.apply(EmptyRow).asInstanceOf[Double] / count.toDouble
+
+  def update(input: Row): Unit = {
+    count += 1
+    sum.update(addFunction, input)
+  }
+}
+
+case class CountFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
+  def this() = this(null, null) // Required for serialization.
+
+  var count: Int = _
+
+  def update(input: Row): Unit = {
+    val evaluatedExpr = expr.map(_.apply(input))
+    if (evaluatedExpr.map(_ != null).reduceLeft(_ || _)) {
+      count += 1
+    }
+  }
+
+  override def apply(input: Row): Any = count
+}
+
+case class SumFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
+  def this() = this(null, null) // Required for serialization.
+
+  private val sum = MutableLiteral(Cast(Literal(0), expr.dataType).apply(null))
+
+  private val addFunction = Add(sum, expr)
+
+  def update(input: Row): Unit = {
+    sum.update(addFunction, input)
+  }
+
+  override def apply(input: Row): Any = sum.apply(null)
+}
+
+case class SumDistinctFunction(expr: Expression, base: AggregateExpression)
+  extends AggregateFunction {
+
+  def this() = this(null, null) // Required for serialization.
+
+  val seen = new scala.collection.mutable.HashSet[Any]()
+
+  def update(input: Row): Unit = {
+    val evaluatedExpr = expr.apply(input)
+    if (evaluatedExpr != null) {
+      seen += evaluatedExpr
+    }
+  }
+
+  override def apply(input: Row): Any =
+    seen.reduceLeft(base.dataType.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].plus)
+}
+
+case class CountDistinctFunction(expr: Seq[Expression], base: AggregateExpression)
+  extends AggregateFunction {
+
+  def this() = this(null, null) // Required for serialization.
+
+  val seen = new scala.collection.mutable.HashSet[Any]()
+
+  def update(input: Row): Unit = {
+    val evaluatedExpr = expr.map(_.apply(input))
+    if (evaluatedExpr.map(_ != null).reduceLeft(_ && _)) {
+      seen += evaluatedExpr
+    }
+  }
+
+  override def apply(input: Row): Any = seen.size
+}
+
+case class FirstFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
+  def this() = this(null, null) // Required for serialization.
+
+  var result: Any = null
+
+  def update(input: Row): Unit = {
+    if (result == null) {
+      result = expr.apply(input)
+    }
+  }
+
+  override def apply(input: Row): Any = result
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
new file mode 100644
index 0000000..db23564
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import catalyst.analysis.UnresolvedException
+import catalyst.types._
+
+case class UnaryMinus(child: Expression) extends UnaryExpression {
+  type EvaluatedType = Any
+
+  def dataType = child.dataType
+  override def foldable = child.foldable
+  def nullable = child.nullable
+  override def toString = s"-$child"
+
+  override def apply(input: Row): Any = {
+    n1(child, input, _.negate(_))
+  }
+}
+
+abstract class BinaryArithmetic extends BinaryExpression {
+  self: Product =>
+
+  type EvaluatedType = Any
+
+  def nullable = left.nullable || right.nullable
+
+  override lazy val resolved =
+    left.resolved && right.resolved && left.dataType == right.dataType
+
+  def dataType = {
+    if (!resolved) {
+      throw new UnresolvedException(this,
+        s"datatype. Can not resolve due to differing types ${left.dataType}, ${right.dataType}")
+    }
+    left.dataType
+  }
+}
+
+case class Add(left: Expression, right: Expression) extends BinaryArithmetic {
+  def symbol = "+"
+
+  override def apply(input: Row): Any = n2(input, left, right, _.plus(_, _))
+}
+
+case class Subtract(left: Expression, right: Expression) extends BinaryArithmetic {
+  def symbol = "-"
+
+  override def apply(input: Row): Any = n2(input, left, right, _.minus(_, _))
+}
+
+case class Multiply(left: Expression, right: Expression) extends BinaryArithmetic {
+  def symbol = "*"
+
+  override def apply(input: Row): Any = n2(input, left, right, _.times(_, _))
+}
+
+case class Divide(left: Expression, right: Expression) extends BinaryArithmetic {
+  def symbol = "/"
+
+  override def apply(input: Row): Any = dataType match {
+    case _: FractionalType => f2(input, left, right, _.div(_, _))
+    case _: IntegralType => i2(input, left , right, _.quot(_, _))
+  }
+
+}
+
+case class Remainder(left: Expression, right: Expression) extends BinaryArithmetic {
+  def symbol = "%"
+
+  override def apply(input: Row): Any = i2(input, left, right, _.rem(_, _))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
new file mode 100644
index 0000000..d3feb6c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types._
+
+/**
+ * Returns the item at `ordinal` in the Array `child` or the Key `ordinal` in Map `child`.
+ */
+case class GetItem(child: Expression, ordinal: Expression) extends Expression {
+  type EvaluatedType = Any
+
+  val children = child :: ordinal :: Nil
+  /** `Null` is returned for invalid ordinals. */
+  override def nullable = true
+  override def references = children.flatMap(_.references).toSet
+  def dataType = child.dataType match {
+    case ArrayType(dt) => dt
+    case MapType(_, vt) => vt
+  }
+  override lazy val resolved =
+    childrenResolved &&
+    (child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType])
+
+  override def toString = s"$child[$ordinal]"
+
+  override def apply(input: Row): Any = {
+    if (child.dataType.isInstanceOf[ArrayType]) {
+      val baseValue = child.apply(input).asInstanceOf[Seq[_]]
+      val o = ordinal.apply(input).asInstanceOf[Int]
+      if (baseValue == null) {
+        null
+      } else if (o >= baseValue.size || o < 0) {
+        null
+      } else {
+        baseValue(o)
+      }
+    } else {
+      val baseValue = child.apply(input).asInstanceOf[Map[Any, _]]
+      val key = ordinal.apply(input)
+      if (baseValue == null) {
+        null
+      } else {
+        baseValue.get(key).orNull
+      }
+    }
+  }
+}
+
+/**
+ * Returns the value of fields in the Struct `child`.
+ */
+case class GetField(child: Expression, fieldName: String) extends UnaryExpression {
+  type EvaluatedType = Any
+
+  def dataType = field.dataType
+  def nullable = field.nullable
+
+  protected def structType = child.dataType match {
+    case s: StructType => s
+    case otherType => sys.error(s"GetField is not valid on fields of type $otherType")
+  }
+
+  lazy val field =
+    structType.fields
+        .find(_.name == fieldName)
+        .getOrElse(sys.error(s"No such field $fieldName in ${child.dataType}"))
+
+  lazy val ordinal = structType.fields.indexOf(field)
+
+  override lazy val resolved = childrenResolved && child.dataType.isInstanceOf[StructType]
+
+  override def apply(input: Row): Any = {
+    val baseValue = child.apply(input).asInstanceOf[Row]
+    if (baseValue == null) null else baseValue(ordinal)
+  }
+
+  override def toString = s"$child.$fieldName"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
new file mode 100644
index 0000000..c367de2
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import catalyst.types._
+
+/**
+ * An expression that produces zero or more rows given a single input row.
+ *
+ * Generators produce multiple output rows instead of a single value like other expressions,
+ * and thus they must have a schema to associate with the rows that are output.
+ *
+ * However, unlike row producing relational operators, which are either leaves or determine their
+ * output schema functionally from their input, generators can contain other expressions that
+ * might result in their modification by rules.  This structure means that they might be copied
+ * multiple times after first determining their output schema. If a new output schema is created for
+ * each copy references up the tree might be rendered invalid. As a result generators must
+ * instead define a function `makeOutput` which is called only once when the schema is first
+ * requested.  The attributes produced by this function will be automatically copied anytime rules
+ * result in changes to the Generator or its children.
+ */
+abstract class Generator extends Expression with (Row => TraversableOnce[Row]) {
+  self: Product =>
+
+  type EvaluatedType = TraversableOnce[Row]
+
+  lazy val dataType =
+    ArrayType(StructType(output.map(a => StructField(a.name, a.dataType, a.nullable))))
+
+  def nullable = false
+
+  def references = children.flatMap(_.references).toSet
+
+  /**
+   * Should be overridden by specific generators.  Called only once for each instance to ensure
+   * that rule application does not change the output schema of a generator.
+   */
+  protected def makeOutput(): Seq[Attribute]
+
+  private var _output: Seq[Attribute] = null
+
+  def output: Seq[Attribute] = {
+    if (_output == null) {
+      _output = makeOutput()
+    }
+    _output
+  }
+
+  /** Should be implemented by child classes to perform specific Generators. */
+  def apply(input: Row): TraversableOnce[Row]
+
+  /** Overridden `makeCopy` also copies the attributes that are produced by this generator. */
+  override def makeCopy(newArgs: Array[AnyRef]): this.type = {
+    val copy = super.makeCopy(newArgs)
+    copy._output = _output
+    copy
+  }
+}
+
+/**
+ * Given an input array produces a sequence of rows for each value in the array.
+ */
+case class Explode(attributeNames: Seq[String], child: Expression)
+  extends Generator with trees.UnaryNode[Expression] {
+
+  override lazy val resolved =
+    child.resolved &&
+    (child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType])
+
+  lazy val elementTypes = child.dataType match {
+    case ArrayType(et) => et :: Nil
+    case MapType(kt,vt) => kt :: vt :: Nil
+  }
+
+  // TODO: Move this pattern into Generator.
+  protected def makeOutput() =
+    if (attributeNames.size == elementTypes.size) {
+      attributeNames.zip(elementTypes).map {
+        case (n, t) => AttributeReference(n, t, nullable = true)()
+      }
+    } else {
+      elementTypes.zipWithIndex.map {
+        case (t, i) => AttributeReference(s"c_$i", t, nullable = true)()
+      }
+    }
+
+  override def apply(input: Row): TraversableOnce[Row] = {
+    child.dataType match {
+      case ArrayType(_) =>
+        val inputArray = child.apply(input).asInstanceOf[Seq[Any]]
+        if (inputArray == null) Nil else inputArray.map(v => new GenericRow(Array(v)))
+      case MapType(_, _) =>
+        val inputMap = child.apply(input).asInstanceOf[Map[Any,Any]]
+        if (inputMap == null) Nil else inputMap.map { case (k,v) => new GenericRow(Array(k,v)) }
+    }
+  }
+
+  override def toString() = s"explode($child)"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
new file mode 100644
index 0000000..229d8f7
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types._
+
+object Literal {
+  def apply(v: Any): Literal = v match {
+    case i: Int => Literal(i, IntegerType)
+    case l: Long => Literal(l, LongType)
+    case d: Double => Literal(d, DoubleType)
+    case f: Float => Literal(f, FloatType)
+    case b: Byte => Literal(b, ByteType)
+    case s: Short => Literal(s, ShortType)
+    case s: String => Literal(s, StringType)
+    case b: Boolean => Literal(b, BooleanType)
+    case null => Literal(null, NullType)
+  }
+}
+
+/**
+ * Extractor for retrieving Int literals.
+ */
+object IntegerLiteral {
+  def unapply(a: Any): Option[Int] = a match {
+    case Literal(a: Int, IntegerType) => Some(a)
+    case _ => None
+  }
+}
+
+case class Literal(value: Any, dataType: DataType) extends LeafExpression {
+
+  override def foldable = true
+  def nullable = value == null
+  def references = Set.empty
+
+  override def toString = if (value != null) value.toString else "null"
+
+  type EvaluatedType = Any
+  override def apply(input: Row):Any = value
+}
+
+// TODO: Specialize
+case class MutableLiteral(var value: Any, nullable: Boolean = true) extends LeafExpression {
+  type EvaluatedType = Any
+
+  val dataType = Literal(value).dataType
+
+  def references = Set.empty
+
+  def update(expression: Expression, input: Row) = {
+    value = expression.apply(input)
+  }
+
+  override def apply(input: Row) = value
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
new file mode 100644
index 0000000..0a06e85
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import catalyst.analysis.UnresolvedAttribute
+import types._
+
+object NamedExpression {
+  private val curId = new java.util.concurrent.atomic.AtomicLong()
+  def newExprId = ExprId(curId.getAndIncrement())
+}
+
+/**
+ * A globally (within this JVM) id for a given named expression.
+ * Used to identify with attribute output by a relation is being
+ * referenced in a subsuqent computation.
+ */
+case class ExprId(id: Long)
+
+abstract class NamedExpression extends Expression {
+  self: Product =>
+
+  def name: String
+  def exprId: ExprId
+  def qualifiers: Seq[String]
+
+  def toAttribute: Attribute
+
+  protected def typeSuffix =
+    if (resolved) {
+      dataType match {
+        case LongType => "L"
+        case _ => ""
+      }
+    } else {
+      ""
+    }
+}
+
+abstract class Attribute extends NamedExpression {
+  self: Product =>
+
+  def withQualifiers(newQualifiers: Seq[String]): Attribute
+
+  def references = Set(this)
+  def toAttribute = this
+  def newInstance: Attribute
+}
+
+/**
+ * Used to assign a new name to a computation.
+ * For example the SQL expression "1 + 1 AS a" could be represented as follows:
+ *  Alias(Add(Literal(1), Literal(1), "a")()
+ *
+ * @param child the computation being performed
+ * @param name the name to be associated with the result of computing [[child]].
+ * @param exprId A globally unique id used to check if an [[AttributeReference]] refers to this
+ *               alias. Auto-assigned if left blank.
+ */
+case class Alias(child: Expression, name: String)
+    (val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
+  extends NamedExpression with trees.UnaryNode[Expression] {
+
+  type EvaluatedType = Any
+
+  override def apply(input: Row) = child.apply(input)
+
+  def dataType = child.dataType
+  def nullable = child.nullable
+  def references = child.references
+
+  def toAttribute = {
+    if (resolved) {
+      AttributeReference(name, child.dataType, child.nullable)(exprId, qualifiers)
+    } else {
+      UnresolvedAttribute(name)
+    }
+  }
+
+  override def toString: String = s"$child AS $name#${exprId.id}$typeSuffix"
+
+  override protected final def otherCopyArgs = exprId :: qualifiers :: Nil
+}
+
+/**
+ * A reference to an attribute produced by another operator in the tree.
+ *
+ * @param name The name of this attribute, should only be used during analysis or for debugging.
+ * @param dataType The [[types.DataType DataType]] of this attribute.
+ * @param nullable True if null is a valid value for this attribute.
+ * @param exprId A globally unique id used to check if different AttributeReferences refer to the
+ *               same attribute.
+ * @param qualifiers a list of strings that can be used to referred to this attribute in a fully
+ *                   qualified way. Consider the examples tableName.name, subQueryAlias.name.
+ *                   tableName and subQueryAlias are possible qualifiers.
+ */
+case class AttributeReference(name: String, dataType: DataType, nullable: Boolean = true)
+    (val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
+  extends Attribute with trees.LeafNode[Expression] {
+
+  override def equals(other: Any) = other match {
+    case ar: AttributeReference => exprId == ar.exprId && dataType == ar.dataType
+    case _ => false
+  }
+
+  override def hashCode: Int = {
+    // See http://stackoverflow.com/questions/113511/hash-code-implementation
+    var h = 17
+    h = h * 37 + exprId.hashCode()
+    h = h * 37 + dataType.hashCode()
+    h
+  }
+
+  def newInstance = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers)
+
+  /**
+   * Returns a copy of this [[AttributeReference]] with changed nullability.
+   */
+  def withNullability(newNullability: Boolean) = {
+    if (nullable == newNullability) {
+      this
+    } else {
+      AttributeReference(name, dataType, newNullability)(exprId, qualifiers)
+    }
+  }
+
+  /**
+   * Returns a copy of this [[AttributeReference]] with new qualifiers.
+   */
+  def withQualifiers(newQualifiers: Seq[String]) = {
+    if (newQualifiers == qualifiers) {
+      this
+    } else {
+      AttributeReference(name, dataType, nullable)(exprId, newQualifiers)
+    }
+  }
+
+  override def toString: String = s"$name#${exprId.id}$typeSuffix"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
new file mode 100644
index 0000000..e869a4d
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import catalyst.analysis.UnresolvedException
+
+case class Coalesce(children: Seq[Expression]) extends Expression {
+  type EvaluatedType = Any
+
+  /** Coalesce is nullable if all of its children are nullable, or if it has no children. */
+  def nullable = !children.exists(!_.nullable)
+
+  def references = children.flatMap(_.references).toSet
+  // Coalesce is foldable if all children are foldable.
+  override def foldable = !children.exists(!_.foldable)
+
+  // Only resolved if all the children are of the same type.
+  override lazy val resolved = childrenResolved && (children.map(_.dataType).distinct.size == 1)
+
+  override def toString = s"Coalesce(${children.mkString(",")})"
+
+  def dataType = if (resolved) {
+    children.head.dataType
+  } else {
+    throw new UnresolvedException(this, "Coalesce cannot have children of different types.")
+  }
+
+  override def apply(input: Row): Any = {
+    var i = 0
+    var result: Any = null
+    while(i < children.size && result == null) {
+      result = children(i).apply(input)
+      i += 1
+    }
+    result
+  }
+}
+
+case class IsNull(child: Expression) extends Predicate with trees.UnaryNode[Expression] {
+  def references = child.references
+  override def foldable = child.foldable
+  def nullable = false
+
+  override def apply(input: Row): Any = {
+    child.apply(input) == null
+  }
+}
+
+case class IsNotNull(child: Expression) extends Predicate with trees.UnaryNode[Expression] {
+  def references = child.references
+  override def foldable = child.foldable
+  def nullable = false
+  override def toString = s"IS NOT NULL $child"
+
+  override def apply(input: Row): Any = {
+    child.apply(input) != null
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
new file mode 100644
index 0000000..76554e1
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+/**
+ * A set of classes that can be used to represent trees of relational expressions.  A key goal of
+ * the expression library is to hide the details of naming and scoping from developers who want to
+ * manipulate trees of relational operators. As such, the library defines a special type of
+ * expression, a [[NamedExpression]] in addition to the standard collection of expressions.
+ *
+ * ==Standard Expressions==
+ * A library of standard expressions (e.g., [[Add]], [[Equals]]), aggregates (e.g., SUM, COUNT),
+ * and other computations (e.g. UDFs). Each expression type is capable of determining its output
+ * schema as a function of its children's output schema.
+ *
+ * ==Named Expressions==
+ * Some expression are named and thus can be referenced by later operators in the dataflow graph.
+ * The two types of named expressions are [[AttributeReference]]s and [[Alias]]es.
+ * [[AttributeReference]]s refer to attributes of the input tuple for a given operator and form
+ * the leaves of some expression trees.  Aliases assign a name to intermediate computations.
+ * For example, in the SQL statement `SELECT a+b AS c FROM ...`, the expressions `a` and `b` would
+ * be represented by `AttributeReferences` and `c` would be represented by an `Alias`.
+ *
+ * During [[analysis]], all named expressions are assigned a globally unique expression id, which
+ * can be used for equality comparisons.  While the original names are kept around for debugging
+ * purposes, they should never be used to check if two attributes refer to the same value, as
+ * plan transformations can result in the introduction of naming ambiguity. For example, consider
+ * a plan that contains subqueries, both of which are reading from the same table.  If an
+ * optimization removes the subqueries, scoping information would be destroyed, eliminating the
+ * ability to reason about which subquery produced a given attribute.
+ *
+ * ==Evaluation==
+ * The result of expressions can be evaluated using the [[Evaluate]] object.
+ */
+package object expressions

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
new file mode 100644
index 0000000..561396e
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types._
+import catalyst.analysis.UnresolvedException
+
+trait Predicate extends Expression {
+  self: Product =>
+
+  def dataType = BooleanType
+
+  type EvaluatedType = Any
+}
+
+trait PredicateHelper {
+  def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match {
+    case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
+    case other => other :: Nil
+  }
+}
+
+abstract class BinaryPredicate extends BinaryExpression with Predicate {
+  self: Product =>
+  def nullable = left.nullable || right.nullable
+}
+
+case class Not(child: Expression) extends Predicate with trees.UnaryNode[Expression] {
+  def references = child.references
+  override def foldable = child.foldable
+  def nullable = child.nullable
+  override def toString = s"NOT $child"
+
+  override def apply(input: Row): Any = {
+    child.apply(input) match {
+      case null => null
+      case b: Boolean => !b
+    }
+  }
+}
+
+/**
+ * Evaluates to `true` if `list` contains `value`.
+ */
+case class In(value: Expression, list: Seq[Expression]) extends Predicate {
+  def children = value +: list
+  def references = children.flatMap(_.references).toSet
+  def nullable = true // TODO: Figure out correct nullability semantics of IN.
+  override def toString = s"$value IN ${list.mkString("(", ",", ")")}"
+
+  override def apply(input: Row): Any = {
+    val evaluatedValue = value.apply(input)
+    list.exists(e => e.apply(input) == evaluatedValue)
+  }
+}
+
+case class And(left: Expression, right: Expression) extends BinaryPredicate {
+  def symbol = "&&"
+
+  override def apply(input: Row): Any = {
+    val l = left.apply(input)
+    val r = right.apply(input)
+    if (l == false || r == false) {
+      false
+    } else if (l == null || r == null ) {
+      null
+    } else {
+      true
+    }
+  }
+}
+
+case class Or(left: Expression, right: Expression) extends BinaryPredicate {
+  def symbol = "||"
+
+  override def apply(input: Row): Any = {
+    val l = left.apply(input)
+    val r = right.apply(input)
+    if (l == true || r == true) {
+      true
+    } else if (l == null || r == null) {
+      null
+    } else {
+      false
+    }
+  }
+}
+
+abstract class BinaryComparison extends BinaryPredicate {
+  self: Product =>
+}
+
+case class Equals(left: Expression, right: Expression) extends BinaryComparison {
+  def symbol = "="
+  override def apply(input: Row): Any = {
+    val l = left.apply(input)
+    val r = right.apply(input)
+    if (l == null || r == null) null else l == r
+  }
+}
+
+case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
+  def symbol = "<"
+  override def apply(input: Row): Any = {
+    if (left.dataType == StringType && right.dataType == StringType) {
+      val l = left.apply(input)
+      val r = right.apply(input)
+      if(l == null || r == null) {
+        null
+      } else {
+        l.asInstanceOf[String] < r.asInstanceOf[String]
+      }
+    } else {
+      n2(input, left, right, _.lt(_, _))
+    }
+  }
+}
+
+case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+  def symbol = "<="
+  override def apply(input: Row): Any = {
+    if (left.dataType == StringType && right.dataType == StringType) {
+      val l = left.apply(input)
+      val r = right.apply(input)
+      if(l == null || r == null) {
+        null
+      } else {
+        l.asInstanceOf[String] <= r.asInstanceOf[String]
+      }
+    } else {
+      n2(input, left, right, _.lteq(_, _))
+    }
+  }
+}
+
+case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
+  def symbol = ">"
+  override def apply(input: Row): Any = {
+    if (left.dataType == StringType && right.dataType == StringType) {
+      val l = left.apply(input)
+      val r = right.apply(input)
+      if(l == null || r == null) {
+        null
+      } else {
+        l.asInstanceOf[String] > r.asInstanceOf[String]
+      }
+    } else {
+      n2(input, left, right, _.gt(_, _))
+    }
+  }
+}
+
+case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+  def symbol = ">="
+  override def apply(input: Row): Any = {
+    if (left.dataType == StringType && right.dataType == StringType) {
+      val l = left.apply(input)
+      val r = right.apply(input)
+      if(l == null || r == null) {
+        null
+      } else {
+        l.asInstanceOf[String] >= r.asInstanceOf[String]
+      }
+    } else {
+      n2(input, left, right, _.gteq(_, _))
+    }
+  }
+}
+
+case class If(predicate: Expression, trueValue: Expression, falseValue: Expression)
+    extends Expression {
+
+  def children = predicate :: trueValue :: falseValue :: Nil
+  def nullable = trueValue.nullable || falseValue.nullable
+  def references = children.flatMap(_.references).toSet
+  override lazy val resolved = childrenResolved && trueValue.dataType == falseValue.dataType
+  def dataType = {
+    if (!resolved) {
+      throw new UnresolvedException(
+        this,
+        s"Can not resolve due to differing types ${trueValue.dataType}, ${falseValue.dataType}")
+    }
+    trueValue.dataType
+  }
+
+  type EvaluatedType = Any
+  override def apply(input: Row): Any = {
+    if (predicate(input).asInstanceOf[Boolean]) {
+      trueValue.apply(input)
+    } else {
+      falseValue.apply(input)
+    }
+  }
+
+  override def toString = s"if ($predicate) $trueValue else $falseValue"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
new file mode 100644
index 0000000..6e58523
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import catalyst.types.BooleanType
+
+case class Like(left: Expression, right: Expression) extends BinaryExpression {
+  def dataType = BooleanType
+  def nullable = left.nullable // Right cannot be null.
+  def symbol = "LIKE"
+}
+