You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/03/21 02:04:50 UTC
[7/9] SPARK-1251 Support for optimizing and executing structured
queries
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala
new file mode 100644
index 0000000..c253587
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+import trees._
+
+/**
+ * Functions for attaching and retrieving trees that are associated with errors.
+ */
+package object errors {
+
+ class TreeNodeException[TreeType <: TreeNode[_]]
+ (tree: TreeType, msg: String, cause: Throwable) extends Exception(msg, cause) {
+
+ // Yes, this is the same as a default parameter, but... those don't seem to work with SBT
+ // external project dependencies for some reason.
+ def this(tree: TreeType, msg: String) = this(tree, msg, null)
+
+ override def getMessage: String = {
+ val treeString = tree.toString
+ s"${super.getMessage}, tree:${if (treeString contains "\n") "\n" else " "}$tree"
+ }
+ }
+
+ /**
+ * Wraps any exceptions that are thrown while executing `f` in a
+ * [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`.
+ */
+ def attachTree[TreeType <: TreeNode[_], A](tree: TreeType, msg: String = "")(f: => A): A = {
+ try f catch {
+ case e: Exception => throw new TreeNodeException(tree, msg, e)
+ }
+ }
+
+ /**
+ * Executes `f` which is expected to throw a
+ * [[catalyst.errors.TreeNodeException TreeNodeException]]. The first tree encountered in
+ * the stack of exceptions of type `TreeType` is returned.
+ */
+ def getTree[TreeType <: TreeNode[_]](f: => Unit): TreeType = ??? // TODO: Implement
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
new file mode 100644
index 0000000..3b6bac1
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import rules._
+import errors._
+
+import catalyst.plans.QueryPlan
+
+/**
+ * A bound reference points to a specific slot in the input tuple, allowing the actual value
+ * to be retrieved more efficiently. However, since operations like column pruning can change
+ * the layout of intermediate tuples, BindReferences should be run after all such transformations.
+ */
+case class BoundReference(ordinal: Int, baseReference: Attribute)
+ extends Attribute with trees.LeafNode[Expression] {
+
+ type EvaluatedType = Any
+
+ def nullable = baseReference.nullable
+ def dataType = baseReference.dataType
+ def exprId = baseReference.exprId
+ def qualifiers = baseReference.qualifiers
+ def name = baseReference.name
+
+ def newInstance = BoundReference(ordinal, baseReference.newInstance)
+ def withQualifiers(newQualifiers: Seq[String]) =
+ BoundReference(ordinal, baseReference.withQualifiers(newQualifiers))
+
+ override def toString = s"$baseReference:$ordinal"
+
+ override def apply(input: Row): Any = input(ordinal)
+}
+
+class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] {
+ import BindReferences._
+
+ def apply(plan: TreeNode): TreeNode = {
+ plan.transform {
+ case leafNode if leafNode.children.isEmpty => leafNode
+ case unaryNode if unaryNode.children.size == 1 => unaryNode.transformExpressions { case e =>
+ bindReference(e, unaryNode.children.head.output)
+ }
+ }
+ }
+}
+
+object BindReferences extends Logging {
+ def bindReference(expression: Expression, input: Seq[Attribute]): Expression = {
+ expression.transform { case a: AttributeReference =>
+ attachTree(a, "Binding attribute") {
+ val ordinal = input.indexWhere(_.exprId == a.exprId)
+ if (ordinal == -1) {
+ // TODO: This fallback is required because some operators (such as ScriptTransform)
+ // produce new attributes that can't be bound. Likely the right thing to do is remove
+ // this rule and require all operators to explicitly bind to the input schema that
+ // they specify.
+ logger.debug(s"Couldn't find $a in ${input.mkString("[", ",", "]")}")
+ a
+ } else {
+ BoundReference(ordinal, a)
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
new file mode 100644
index 0000000..608656d
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types._
+
+/** Cast the child expression to the target data type. */
+case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
+ override def foldable = child.foldable
+ def nullable = child.nullable
+ override def toString = s"CAST($child, $dataType)"
+
+ type EvaluatedType = Any
+
+ lazy val castingFunction: Any => Any = (child.dataType, dataType) match {
+ case (BinaryType, StringType) => a: Any => new String(a.asInstanceOf[Array[Byte]])
+ case (StringType, BinaryType) => a: Any => a.asInstanceOf[String].getBytes
+ case (_, StringType) => a: Any => a.toString
+ case (StringType, IntegerType) => a: Any => castOrNull(a, _.toInt)
+ case (StringType, DoubleType) => a: Any => castOrNull(a, _.toDouble)
+ case (StringType, FloatType) => a: Any => castOrNull(a, _.toFloat)
+ case (StringType, LongType) => a: Any => castOrNull(a, _.toLong)
+ case (StringType, ShortType) => a: Any => castOrNull(a, _.toShort)
+ case (StringType, ByteType) => a: Any => castOrNull(a, _.toByte)
+ case (StringType, DecimalType) => a: Any => castOrNull(a, BigDecimal(_))
+ case (BooleanType, ByteType) => a: Any => a match {
+ case null => null
+ case true => 1.toByte
+ case false => 0.toByte
+ }
+ case (dt, IntegerType) =>
+ a: Any => dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toInt(a)
+ case (dt, DoubleType) =>
+ a: Any => dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toDouble(a)
+ case (dt, FloatType) =>
+ a: Any => dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toFloat(a)
+ case (dt, LongType) =>
+ a: Any => dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toLong(a)
+ case (dt, ShortType) =>
+ a: Any => dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toInt(a).toShort
+ case (dt, ByteType) =>
+ a: Any => dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toInt(a).toByte
+ case (dt, DecimalType) =>
+ a: Any =>
+ BigDecimal(dt.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].toDouble(a))
+ }
+
+ @inline
+ protected def castOrNull[A](a: Any, f: String => A) =
+ try f(a.asInstanceOf[String]) catch {
+ case _: java.lang.NumberFormatException => null
+ }
+
+ override def apply(input: Row): Any = {
+ val evaluated = child.apply(input)
+ if (evaluated == null) {
+ null
+ } else {
+ castingFunction(evaluated)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
new file mode 100644
index 0000000..78aaaee
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import errors._
+import trees._
+import types._
+
+abstract class Expression extends TreeNode[Expression] {
+ self: Product =>
+
+ /** The narrowest possible type that is produced when this expression is evaluated. */
+ type EvaluatedType <: Any
+
+ def dataType: DataType
+
+ /**
+ * Returns true when an expression is a candidate for static evaluation before the query is
+ * executed.
+ *
+ * The following conditions are used to determine suitability for constant folding:
+ * - A [[expressions.Coalesce Coalesce]] is foldable if all of its children are foldable
+ * - A [[expressions.BinaryExpression BinaryExpression]] is foldable if its both left and right
+ * child are foldable
+ * - A [[expressions.Not Not]], [[expressions.IsNull IsNull]], or
+ * [[expressions.IsNotNull IsNotNull]] is foldable if its child is foldable.
+ * - A [[expressions.Literal]] is foldable.
+ * - A [[expressions.Cast Cast]] or [[expressions.UnaryMinus UnaryMinus]] is foldable if its
+ * child is foldable.
+ */
+ // TODO: Supporting more foldable expressions. For example, deterministic Hive UDFs.
+ def foldable: Boolean = false
+ def nullable: Boolean
+ def references: Set[Attribute]
+
+ /** Returns the result of evaluating this expression on a given input Row */
+ def apply(input: Row = null): EvaluatedType =
+ throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
+
+ /**
+ * Returns `true` if this expression and all its children have been resolved to a specific schema
+ * and `false` if it is still contains any unresolved placeholders. Implementations of expressions
+ * should override this if the resolution of this type of expression involves more than just
+ * the resolution of its children.
+ */
+ lazy val resolved: Boolean = childrenResolved
+
+ /**
+ * Returns true if all the children of this expression have been resolved to a specific schema
+ * and false if any still contains any unresolved placeholders.
+ */
+ def childrenResolved = !children.exists(!_.resolved)
+
+ /**
+ * A set of helper functions that return the correct descendant of [[scala.math.Numeric]] type
+ * and do any casting necessary of child evaluation.
+ */
+ @inline
+ def n1(e: Expression, i: Row, f: ((Numeric[Any], Any) => Any)): Any = {
+ val evalE = e.apply(i)
+ if (evalE == null) {
+ null
+ } else {
+ e.dataType match {
+ case n: NumericType =>
+ val castedFunction = f.asInstanceOf[(Numeric[n.JvmType], n.JvmType) => n.JvmType]
+ castedFunction(n.numeric, evalE.asInstanceOf[n.JvmType])
+ case other => sys.error(s"Type $other does not support numeric operations")
+ }
+ }
+ }
+
+ @inline
+ protected final def n2(
+ i: Row,
+ e1: Expression,
+ e2: Expression,
+ f: ((Numeric[Any], Any, Any) => Any)): Any = {
+
+ if (e1.dataType != e2.dataType) {
+ throw new TreeNodeException(this, s"Types do not match ${e1.dataType} != ${e2.dataType}")
+ }
+
+ val evalE1 = e1.apply(i)
+ if(evalE1 == null) {
+ null
+ } else {
+ val evalE2 = e2.apply(i)
+ if (evalE2 == null) {
+ null
+ } else {
+ e1.dataType match {
+ case n: NumericType =>
+ f.asInstanceOf[(Numeric[n.JvmType], n.JvmType, n.JvmType) => Int](
+ n.numeric, evalE1.asInstanceOf[n.JvmType], evalE2.asInstanceOf[n.JvmType])
+ case other => sys.error(s"Type $other does not support numeric operations")
+ }
+ }
+ }
+ }
+
+ @inline
+ protected final def f2(
+ i: Row,
+ e1: Expression,
+ e2: Expression,
+ f: ((Fractional[Any], Any, Any) => Any)): Any = {
+ if (e1.dataType != e2.dataType) {
+ throw new TreeNodeException(this, s"Types do not match ${e1.dataType} != ${e2.dataType}")
+ }
+
+ val evalE1 = e1.apply(i: Row)
+ if(evalE1 == null) {
+ null
+ } else {
+ val evalE2 = e2.apply(i: Row)
+ if (evalE2 == null) {
+ null
+ } else {
+ e1.dataType match {
+ case ft: FractionalType =>
+ f.asInstanceOf[(Fractional[ft.JvmType], ft.JvmType, ft.JvmType) => ft.JvmType](
+ ft.fractional, evalE1.asInstanceOf[ft.JvmType], evalE2.asInstanceOf[ft.JvmType])
+ case other => sys.error(s"Type $other does not support fractional operations")
+ }
+ }
+ }
+ }
+
+ @inline
+ protected final def i2(
+ i: Row,
+ e1: Expression,
+ e2: Expression,
+ f: ((Integral[Any], Any, Any) => Any)): Any = {
+ if (e1.dataType != e2.dataType) {
+ throw new TreeNodeException(this, s"Types do not match ${e1.dataType} != ${e2.dataType}")
+ }
+
+ val evalE1 = e1.apply(i)
+ if(evalE1 == null) {
+ null
+ } else {
+ val evalE2 = e2.apply(i)
+ if (evalE2 == null) {
+ null
+ } else {
+ e1.dataType match {
+ case i: IntegralType =>
+ f.asInstanceOf[(Integral[i.JvmType], i.JvmType, i.JvmType) => i.JvmType](
+ i.integral, evalE1.asInstanceOf[i.JvmType], evalE2.asInstanceOf[i.JvmType])
+ case other => sys.error(s"Type $other does not support numeric operations")
+ }
+ }
+ }
+ }
+}
+
+abstract class BinaryExpression extends Expression with trees.BinaryNode[Expression] {
+ self: Product =>
+
+ def symbol: String
+
+ override def foldable = left.foldable && right.foldable
+
+ def references = left.references ++ right.references
+
+ override def toString = s"($left $symbol $right)"
+}
+
+abstract class LeafExpression extends Expression with trees.LeafNode[Expression] {
+ self: Product =>
+}
+
+abstract class UnaryExpression extends Expression with trees.UnaryNode[Expression] {
+ self: Product =>
+
+ def references = child.references
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
new file mode 100644
index 0000000..8c407d2
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+package expressions
+
+/**
+ * Converts a [[Row]] to another Row given a sequence of expression that define each column of the
+ * new row. If the schema of the input row is specified, then the given expression will be bound to
+ * that schema.
+ */
+class Projection(expressions: Seq[Expression]) extends (Row => Row) {
+ def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+ this(expressions.map(BindReferences.bindReference(_, inputSchema)))
+
+ protected val exprArray = expressions.toArray
+ def apply(input: Row): Row = {
+ val outputArray = new Array[Any](exprArray.size)
+ var i = 0
+ while (i < exprArray.size) {
+ outputArray(i) = exprArray(i).apply(input)
+ i += 1
+ }
+ new GenericRow(outputArray)
+ }
+}
+
+/**
+ * Converts a [[Row]] to another Row given a sequence of expression that define each column of th
+ * new row. If the schema of the input row is specified, then the given expression will be bound to
+ * that schema.
+ *
+ * In contrast to a normal projection, a MutableProjection reuses the same underlying row object
+ * each time an input row is added. This significatly reduces the cost of calcuating the
+ * projection, but means that it is not safe
+ */
+case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row) {
+ def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+ this(expressions.map(BindReferences.bindReference(_, inputSchema)))
+
+ private[this] val exprArray = expressions.toArray
+ private[this] val mutableRow = new GenericMutableRow(exprArray.size)
+ def currentValue: Row = mutableRow
+
+ def apply(input: Row): Row = {
+ var i = 0
+ while (i < exprArray.size) {
+ mutableRow(i) = exprArray(i).apply(input)
+ i += 1
+ }
+ mutableRow
+ }
+}
+
+/**
+ * A mutable wrapper that makes two rows appear appear as a single concatenated row. Designed to
+ * be instantiated once per thread and reused.
+ */
+class JoinedRow extends Row {
+ private[this] var row1: Row = _
+ private[this] var row2: Row = _
+
+ /** Updates this JoinedRow to used point at two new base rows. Returns itself. */
+ def apply(r1: Row, r2: Row): Row = {
+ row1 = r1
+ row2 = r2
+ this
+ }
+
+ def iterator = row1.iterator ++ row2.iterator
+
+ def length = row1.length + row2.length
+
+ def apply(i: Int) =
+ if (i < row1.size) row1(i) else row2(i - row1.size)
+
+ def isNullAt(i: Int) = apply(i) == null
+
+ def getInt(i: Int): Int =
+ if (i < row1.size) row1.getInt(i) else row2.getInt(i - row1.size)
+
+ def getLong(i: Int): Long =
+ if (i < row1.size) row1.getLong(i) else row2.getLong(i - row1.size)
+
+ def getDouble(i: Int): Double =
+ if (i < row1.size) row1.getDouble(i) else row2.getDouble(i - row1.size)
+
+ def getBoolean(i: Int): Boolean =
+ if (i < row1.size) row1.getBoolean(i) else row2.getBoolean(i - row1.size)
+
+ def getShort(i: Int): Short =
+ if (i < row1.size) row1.getShort(i) else row2.getShort(i - row1.size)
+
+ def getByte(i: Int): Byte =
+ if (i < row1.size) row1.getByte(i) else row2.getByte(i - row1.size)
+
+ def getFloat(i: Int): Float =
+ if (i < row1.size) row1.getFloat(i) else row2.getFloat(i - row1.size)
+
+ def getString(i: Int): String =
+ if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size)
+
+ def copy() = {
+ val totalSize = row1.size + row2.size
+ val copiedValues = new Array[Any](totalSize)
+ var i = 0
+ while(i < totalSize) {
+ copiedValues(i) = apply(i)
+ i += 1
+ }
+ new GenericRow(copiedValues)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Rand.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Rand.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Rand.scala
new file mode 100644
index 0000000..a5d0ecf
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Rand.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types.DoubleType
+
+case object Rand extends LeafExpression {
+ def dataType = DoubleType
+ def nullable = false
+ def references = Set.empty
+ override def toString = "RAND()"
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
new file mode 100644
index 0000000..3529675
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types._
+
+/**
+ * Represents one row of output from a relational operator. Allows both generic access by ordinal,
+ * which will incur boxing overhead for primitives, as well as native primitive access.
+ *
+ * It is invalid to use the native primitive interface to retrieve a value that is null, instead a
+ * user must check [[isNullAt]] before attempting to retrieve a value that might be null.
+ */
+trait Row extends Seq[Any] with Serializable {
+ def apply(i: Int): Any
+
+ def isNullAt(i: Int): Boolean
+
+ def getInt(i: Int): Int
+ def getLong(i: Int): Long
+ def getDouble(i: Int): Double
+ def getFloat(i: Int): Float
+ def getBoolean(i: Int): Boolean
+ def getShort(i: Int): Short
+ def getByte(i: Int): Byte
+ def getString(i: Int): String
+
+ override def toString() =
+ s"[${this.mkString(",")}]"
+
+ def copy(): Row
+}
+
+/**
+ * An extended interface to [[Row]] that allows the values for each column to be updated. Setting
+ * a value through a primitive function implicitly marks that column as not null.
+ */
+trait MutableRow extends Row {
+ def setNullAt(i: Int): Unit
+
+ def update(ordinal: Int, value: Any)
+
+ def setInt(ordinal: Int, value: Int)
+ def setLong(ordinal: Int, value: Long)
+ def setDouble(ordinal: Int, value: Double)
+ def setBoolean(ordinal: Int, value: Boolean)
+ def setShort(ordinal: Int, value: Short)
+ def setByte(ordinal: Int, value: Byte)
+ def setFloat(ordinal: Int, value: Float)
+ def setString(ordinal: Int, value: String)
+
+ /**
+ * EXPERIMENTAL
+ *
+ * Returns a mutable string builder for the specified column. A given row should return the
+ * result of any mutations made to the returned buffer next time getString is called for the same
+ * column.
+ */
+ def getStringBuilder(ordinal: Int): StringBuilder
+}
+
+/**
+ * A row with no data. Calling any methods will result in an error. Can be used as a placeholder.
+ */
+object EmptyRow extends Row {
+ def apply(i: Int): Any = throw new UnsupportedOperationException
+
+ def iterator = Iterator.empty
+ def length = 0
+ def isNullAt(i: Int): Boolean = throw new UnsupportedOperationException
+
+ def getInt(i: Int): Int = throw new UnsupportedOperationException
+ def getLong(i: Int): Long = throw new UnsupportedOperationException
+ def getDouble(i: Int): Double = throw new UnsupportedOperationException
+ def getFloat(i: Int): Float = throw new UnsupportedOperationException
+ def getBoolean(i: Int): Boolean = throw new UnsupportedOperationException
+ def getShort(i: Int): Short = throw new UnsupportedOperationException
+ def getByte(i: Int): Byte = throw new UnsupportedOperationException
+ def getString(i: Int): String = throw new UnsupportedOperationException
+
+ def copy() = this
+}
+
+/**
+ * A row implementation that uses an array of objects as the underlying storage. Note that, while
+ * the array is not copied, and thus could technically be mutated after creation, this is not
+ * allowed.
+ */
+class GenericRow(protected[catalyst] val values: Array[Any]) extends Row {
+ /** No-arg constructor for serialization. */
+ def this() = this(null)
+
+ def this(size: Int) = this(new Array[Any](size))
+
+ def iterator = values.iterator
+
+ def length = values.length
+
+ def apply(i: Int) = values(i)
+
+ def isNullAt(i: Int) = values(i) == null
+
+ def getInt(i: Int): Int = {
+ if (values(i) == null) sys.error("Failed to check null bit for primitive int value.")
+ values(i).asInstanceOf[Int]
+ }
+
+ def getLong(i: Int): Long = {
+ if (values(i) == null) sys.error("Failed to check null bit for primitive long value.")
+ values(i).asInstanceOf[Long]
+ }
+
+ def getDouble(i: Int): Double = {
+ if (values(i) == null) sys.error("Failed to check null bit for primitive double value.")
+ values(i).asInstanceOf[Double]
+ }
+
+ def getFloat(i: Int): Float = {
+ if (values(i) == null) sys.error("Failed to check null bit for primitive float value.")
+ values(i).asInstanceOf[Float]
+ }
+
+ def getBoolean(i: Int): Boolean = {
+ if (values(i) == null) sys.error("Failed to check null bit for primitive boolean value.")
+ values(i).asInstanceOf[Boolean]
+ }
+
+ def getShort(i: Int): Short = {
+ if (values(i) == null) sys.error("Failed to check null bit for primitive short value.")
+ values(i).asInstanceOf[Short]
+ }
+
+ def getByte(i: Int): Byte = {
+ if (values(i) == null) sys.error("Failed to check null bit for primitive byte value.")
+ values(i).asInstanceOf[Byte]
+ }
+
+ def getString(i: Int): String = {
+ if (values(i) == null) sys.error("Failed to check null bit for primitive String value.")
+ values(i).asInstanceOf[String]
+ }
+
+ def copy() = this
+}
+
+class GenericMutableRow(size: Int) extends GenericRow(size) with MutableRow {
+ /** No-arg constructor for serialization. */
+ def this() = this(0)
+
+ def getStringBuilder(ordinal: Int): StringBuilder = ???
+
+ override def setBoolean(ordinal: Int,value: Boolean): Unit = { values(ordinal) = value }
+ override def setByte(ordinal: Int,value: Byte): Unit = { values(ordinal) = value }
+ override def setDouble(ordinal: Int,value: Double): Unit = { values(ordinal) = value }
+ override def setFloat(ordinal: Int,value: Float): Unit = { values(ordinal) = value }
+ override def setInt(ordinal: Int,value: Int): Unit = { values(ordinal) = value }
+ override def setLong(ordinal: Int,value: Long): Unit = { values(ordinal) = value }
+ override def setString(ordinal: Int,value: String): Unit = { values(ordinal) = value }
+
+ override def setNullAt(i: Int): Unit = { values(i) = null }
+
+ override def setShort(ordinal: Int,value: Short): Unit = { values(ordinal) = value }
+
+ override def update(ordinal: Int,value: Any): Unit = { values(ordinal) = value }
+
+ override def copy() = new GenericRow(values.clone())
+}
+
+
+class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
+ def compare(a: Row, b: Row): Int = {
+ var i = 0
+ while (i < ordering.size) {
+ val order = ordering(i)
+ val left = order.child.apply(a)
+ val right = order.child.apply(b)
+
+ if (left == null && right == null) {
+ // Both null, continue looking.
+ } else if (left == null) {
+ return if (order.direction == Ascending) -1 else 1
+ } else if (right == null) {
+ return if (order.direction == Ascending) 1 else -1
+ } else {
+ val comparison = order.dataType match {
+ case n: NativeType if order.direction == Ascending =>
+ n.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
+ case n: NativeType if order.direction == Descending =>
+ n.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
+ }
+ if (comparison != 0) return comparison
+ }
+ i += 1
+ }
+ return 0
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
new file mode 100644
index 0000000..a3c7ca1
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types._
+
+case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression])
+ extends Expression {
+
+ type EvaluatedType = Any
+
+ def references = children.flatMap(_.references).toSet
+ def nullable = true
+
+ override def apply(input: Row): Any = {
+ children.size match {
+ case 1 => function.asInstanceOf[(Any) => Any](children(0).apply(input))
+ case 2 =>
+ function.asInstanceOf[(Any, Any) => Any](
+ children(0).apply(input),
+ children(1).apply(input))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
new file mode 100644
index 0000000..171997b
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+abstract sealed class SortDirection
+case object Ascending extends SortDirection
+case object Descending extends SortDirection
+
+/**
+ * An expression that can be used to sort a tuple. This class extends expression primarily so that
+ * transformations over expression will descend into its child.
+ */
+case class SortOrder(child: Expression, direction: SortDirection) extends UnaryExpression {
+ def dataType = child.dataType
+ def nullable = child.nullable
+ override def toString = s"$child ${if (direction == Ascending) "ASC" else "DESC"}"
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala
new file mode 100644
index 0000000..2ad8d6f
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import scala.language.dynamics
+
+import types._
+
+case object DynamicType extends DataType
+
+case class WrapDynamic(children: Seq[Attribute]) extends Expression {
+ type EvaluatedType = DynamicRow
+
+ def nullable = false
+ def references = children.toSet
+ def dataType = DynamicType
+
+ override def apply(input: Row): DynamicRow = input match {
+ // Avoid copy for generic rows.
+ case g: GenericRow => new DynamicRow(children, g.values)
+ case otherRowType => new DynamicRow(children, otherRowType.toArray)
+ }
+}
+
+class DynamicRow(val schema: Seq[Attribute], values: Array[Any])
+ extends GenericRow(values) with Dynamic {
+
+ def selectDynamic(attributeName: String): String = {
+ val ordinal = schema.indexWhere(_.name == attributeName)
+ values(ordinal).toString
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
new file mode 100644
index 0000000..2287a84
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import catalyst.types._
+
+abstract class AggregateExpression extends Expression {
+ self: Product =>
+
+ /**
+ * Creates a new instance that can be used to compute this aggregate expression for a group
+ * of input rows/
+ */
+ def newInstance: AggregateFunction
+}
+
+/**
+ * Represents an aggregation that has been rewritten to be performed in two steps.
+ *
+ * @param finalEvaluation an aggregate expression that evaluates to same final result as the
+ * original aggregation.
+ * @param partialEvaluations A sequence of [[NamedExpression]]s that can be computed on partial
+ * data sets and are required to compute the `finalEvaluation`.
+ */
+case class SplitEvaluation(
+ finalEvaluation: Expression,
+ partialEvaluations: Seq[NamedExpression])
+
+/**
+ * An [[AggregateExpression]] that can be partially computed without seeing all relevent tuples.
+ * These partial evaluations can then be combined to compute the actual answer.
+ */
+abstract class PartialAggregate extends AggregateExpression {
+ self: Product =>
+
+ /**
+ * Returns a [[SplitEvaluation]] that computes this aggregation using partial aggregation.
+ */
+ def asPartial: SplitEvaluation
+}
+
+/**
+ * A specific implementation of an aggregate function. Used to wrap a generic
+ * [[AggregateExpression]] with an algorithm that will be used to compute one specific result.
+ */
+abstract class AggregateFunction
+ extends AggregateExpression with Serializable with trees.LeafNode[Expression] {
+ self: Product =>
+
+ type EvaluatedType = Any
+
+ /** Base should return the generic aggregate expression that this function is computing */
+ val base: AggregateExpression
+ def references = base.references
+ def nullable = base.nullable
+ def dataType = base.dataType
+
+ def update(input: Row): Unit
+ override def apply(input: Row): Any
+
+ // Do we really need this?
+ def newInstance = makeCopy(productIterator.map { case a: AnyRef => a }.toArray)
+}
+
+case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
+ def references = child.references
+ def nullable = false
+ def dataType = IntegerType
+ override def toString = s"COUNT($child)"
+
+ def asPartial: SplitEvaluation = {
+ val partialCount = Alias(Count(child), "PartialCount")()
+ SplitEvaluation(Sum(partialCount.toAttribute), partialCount :: Nil)
+ }
+
+ override def newInstance = new CountFunction(child, this)
+}
+
+case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpression {
+ def children = expressions
+ def references = expressions.flatMap(_.references).toSet
+ def nullable = false
+ def dataType = IntegerType
+ override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")}})"
+ override def newInstance = new CountDistinctFunction(expressions, this)
+}
+
+case class Average(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
+ def references = child.references
+ def nullable = false
+ def dataType = DoubleType
+ override def toString = s"AVG($child)"
+
+ override def asPartial: SplitEvaluation = {
+ val partialSum = Alias(Sum(child), "PartialSum")()
+ val partialCount = Alias(Count(child), "PartialCount")()
+ val castedSum = Cast(Sum(partialSum.toAttribute), dataType)
+ val castedCount = Cast(Sum(partialCount.toAttribute), dataType)
+
+ SplitEvaluation(
+ Divide(castedSum, castedCount),
+ partialCount :: partialSum :: Nil)
+ }
+
+ override def newInstance = new AverageFunction(child, this)
+}
+
+case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
+ def references = child.references
+ def nullable = false
+ def dataType = child.dataType
+ override def toString = s"SUM($child)"
+
+ override def asPartial: SplitEvaluation = {
+ val partialSum = Alias(Sum(child), "PartialSum")()
+ SplitEvaluation(
+ Sum(partialSum.toAttribute),
+ partialSum :: Nil)
+ }
+
+ override def newInstance = new SumFunction(child, this)
+}
+
+case class SumDistinct(child: Expression)
+ extends AggregateExpression with trees.UnaryNode[Expression] {
+
+ def references = child.references
+ def nullable = false
+ def dataType = child.dataType
+ override def toString = s"SUM(DISTINCT $child)"
+
+ override def newInstance = new SumDistinctFunction(child, this)
+}
+
+case class First(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
+ def references = child.references
+ def nullable = child.nullable
+ def dataType = child.dataType
+ override def toString = s"FIRST($child)"
+
+ override def asPartial: SplitEvaluation = {
+ val partialFirst = Alias(First(child), "PartialFirst")()
+ SplitEvaluation(
+ First(partialFirst.toAttribute),
+ partialFirst :: Nil)
+ }
+ override def newInstance = new FirstFunction(child, this)
+}
+
+case class AverageFunction(expr: Expression, base: AggregateExpression)
+ extends AggregateFunction {
+
+ def this() = this(null, null) // Required for serialization.
+
+ private var count: Long = _
+ private val sum = MutableLiteral(Cast(Literal(0), expr.dataType).apply(EmptyRow))
+ private val sumAsDouble = Cast(sum, DoubleType)
+
+
+
+ private val addFunction = Add(sum, expr)
+
+ override def apply(input: Row): Any =
+ sumAsDouble.apply(EmptyRow).asInstanceOf[Double] / count.toDouble
+
+ def update(input: Row): Unit = {
+ count += 1
+ sum.update(addFunction, input)
+ }
+}
+
+case class CountFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
+ def this() = this(null, null) // Required for serialization.
+
+ var count: Int = _
+
+ def update(input: Row): Unit = {
+ val evaluatedExpr = expr.map(_.apply(input))
+ if (evaluatedExpr.map(_ != null).reduceLeft(_ || _)) {
+ count += 1
+ }
+ }
+
+ override def apply(input: Row): Any = count
+}
+
+case class SumFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
+ def this() = this(null, null) // Required for serialization.
+
+ private val sum = MutableLiteral(Cast(Literal(0), expr.dataType).apply(null))
+
+ private val addFunction = Add(sum, expr)
+
+ def update(input: Row): Unit = {
+ sum.update(addFunction, input)
+ }
+
+ override def apply(input: Row): Any = sum.apply(null)
+}
+
+case class SumDistinctFunction(expr: Expression, base: AggregateExpression)
+ extends AggregateFunction {
+
+ def this() = this(null, null) // Required for serialization.
+
+ val seen = new scala.collection.mutable.HashSet[Any]()
+
+ def update(input: Row): Unit = {
+ val evaluatedExpr = expr.apply(input)
+ if (evaluatedExpr != null) {
+ seen += evaluatedExpr
+ }
+ }
+
+ override def apply(input: Row): Any =
+ seen.reduceLeft(base.dataType.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]].plus)
+}
+
+case class CountDistinctFunction(expr: Seq[Expression], base: AggregateExpression)
+ extends AggregateFunction {
+
+ def this() = this(null, null) // Required for serialization.
+
+ val seen = new scala.collection.mutable.HashSet[Any]()
+
+ def update(input: Row): Unit = {
+ val evaluatedExpr = expr.map(_.apply(input))
+ if (evaluatedExpr.map(_ != null).reduceLeft(_ && _)) {
+ seen += evaluatedExpr
+ }
+ }
+
+ override def apply(input: Row): Any = seen.size
+}
+
+case class FirstFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
+ def this() = this(null, null) // Required for serialization.
+
+ var result: Any = null
+
+ def update(input: Row): Unit = {
+ if (result == null) {
+ result = expr.apply(input)
+ }
+ }
+
+ override def apply(input: Row): Any = result
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
new file mode 100644
index 0000000..db23564
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import catalyst.analysis.UnresolvedException
+import catalyst.types._
+
+case class UnaryMinus(child: Expression) extends UnaryExpression {
+ type EvaluatedType = Any
+
+ def dataType = child.dataType
+ override def foldable = child.foldable
+ def nullable = child.nullable
+ override def toString = s"-$child"
+
+ override def apply(input: Row): Any = {
+ n1(child, input, _.negate(_))
+ }
+}
+
+abstract class BinaryArithmetic extends BinaryExpression {
+ self: Product =>
+
+ type EvaluatedType = Any
+
+ def nullable = left.nullable || right.nullable
+
+ override lazy val resolved =
+ left.resolved && right.resolved && left.dataType == right.dataType
+
+ def dataType = {
+ if (!resolved) {
+ throw new UnresolvedException(this,
+ s"datatype. Can not resolve due to differing types ${left.dataType}, ${right.dataType}")
+ }
+ left.dataType
+ }
+}
+
+case class Add(left: Expression, right: Expression) extends BinaryArithmetic {
+ def symbol = "+"
+
+ override def apply(input: Row): Any = n2(input, left, right, _.plus(_, _))
+}
+
+case class Subtract(left: Expression, right: Expression) extends BinaryArithmetic {
+ def symbol = "-"
+
+ override def apply(input: Row): Any = n2(input, left, right, _.minus(_, _))
+}
+
+case class Multiply(left: Expression, right: Expression) extends BinaryArithmetic {
+ def symbol = "*"
+
+ override def apply(input: Row): Any = n2(input, left, right, _.times(_, _))
+}
+
+case class Divide(left: Expression, right: Expression) extends BinaryArithmetic {
+ def symbol = "/"
+
+ override def apply(input: Row): Any = dataType match {
+ case _: FractionalType => f2(input, left, right, _.div(_, _))
+ case _: IntegralType => i2(input, left , right, _.quot(_, _))
+ }
+
+}
+
+case class Remainder(left: Expression, right: Expression) extends BinaryArithmetic {
+ def symbol = "%"
+
+ override def apply(input: Row): Any = i2(input, left, right, _.rem(_, _))
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
new file mode 100644
index 0000000..d3feb6c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types._
+
+/**
+ * Returns the item at `ordinal` in the Array `child` or the Key `ordinal` in Map `child`.
+ */
+case class GetItem(child: Expression, ordinal: Expression) extends Expression {
+ type EvaluatedType = Any
+
+ val children = child :: ordinal :: Nil
+ /** `Null` is returned for invalid ordinals. */
+ override def nullable = true
+ override def references = children.flatMap(_.references).toSet
+ def dataType = child.dataType match {
+ case ArrayType(dt) => dt
+ case MapType(_, vt) => vt
+ }
+ override lazy val resolved =
+ childrenResolved &&
+ (child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType])
+
+ override def toString = s"$child[$ordinal]"
+
+ override def apply(input: Row): Any = {
+ if (child.dataType.isInstanceOf[ArrayType]) {
+ val baseValue = child.apply(input).asInstanceOf[Seq[_]]
+ val o = ordinal.apply(input).asInstanceOf[Int]
+ if (baseValue == null) {
+ null
+ } else if (o >= baseValue.size || o < 0) {
+ null
+ } else {
+ baseValue(o)
+ }
+ } else {
+ val baseValue = child.apply(input).asInstanceOf[Map[Any, _]]
+ val key = ordinal.apply(input)
+ if (baseValue == null) {
+ null
+ } else {
+ baseValue.get(key).orNull
+ }
+ }
+ }
+}
+
+/**
+ * Returns the value of fields in the Struct `child`.
+ */
+case class GetField(child: Expression, fieldName: String) extends UnaryExpression {
+ type EvaluatedType = Any
+
+ def dataType = field.dataType
+ def nullable = field.nullable
+
+ protected def structType = child.dataType match {
+ case s: StructType => s
+ case otherType => sys.error(s"GetField is not valid on fields of type $otherType")
+ }
+
+ lazy val field =
+ structType.fields
+ .find(_.name == fieldName)
+ .getOrElse(sys.error(s"No such field $fieldName in ${child.dataType}"))
+
+ lazy val ordinal = structType.fields.indexOf(field)
+
+ override lazy val resolved = childrenResolved && child.dataType.isInstanceOf[StructType]
+
+ override def apply(input: Row): Any = {
+ val baseValue = child.apply(input).asInstanceOf[Row]
+ if (baseValue == null) null else baseValue(ordinal)
+ }
+
+ override def toString = s"$child.$fieldName"
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
new file mode 100644
index 0000000..c367de2
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import catalyst.types._
+
+/**
+ * An expression that produces zero or more rows given a single input row.
+ *
+ * Generators produce multiple output rows instead of a single value like other expressions,
+ * and thus they must have a schema to associate with the rows that are output.
+ *
+ * However, unlike row producing relational operators, which are either leaves or determine their
+ * output schema functionally from their input, generators can contain other expressions that
+ * might result in their modification by rules. This structure means that they might be copied
+ * multiple times after first determining their output schema. If a new output schema is created for
+ * each copy references up the tree might be rendered invalid. As a result generators must
+ * instead define a function `makeOutput` which is called only once when the schema is first
+ * requested. The attributes produced by this function will be automatically copied anytime rules
+ * result in changes to the Generator or its children.
+ */
+abstract class Generator extends Expression with (Row => TraversableOnce[Row]) {
+ self: Product =>
+
+ type EvaluatedType = TraversableOnce[Row]
+
+ lazy val dataType =
+ ArrayType(StructType(output.map(a => StructField(a.name, a.dataType, a.nullable))))
+
+ def nullable = false
+
+ def references = children.flatMap(_.references).toSet
+
+ /**
+ * Should be overridden by specific generators. Called only once for each instance to ensure
+ * that rule application does not change the output schema of a generator.
+ */
+ protected def makeOutput(): Seq[Attribute]
+
+ private var _output: Seq[Attribute] = null
+
+ def output: Seq[Attribute] = {
+ if (_output == null) {
+ _output = makeOutput()
+ }
+ _output
+ }
+
+ /** Should be implemented by child classes to perform specific Generators. */
+ def apply(input: Row): TraversableOnce[Row]
+
+ /** Overridden `makeCopy` also copies the attributes that are produced by this generator. */
+ override def makeCopy(newArgs: Array[AnyRef]): this.type = {
+ val copy = super.makeCopy(newArgs)
+ copy._output = _output
+ copy
+ }
+}
+
+/**
+ * Given an input array produces a sequence of rows for each value in the array.
+ */
+case class Explode(attributeNames: Seq[String], child: Expression)
+ extends Generator with trees.UnaryNode[Expression] {
+
+ override lazy val resolved =
+ child.resolved &&
+ (child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType])
+
+ lazy val elementTypes = child.dataType match {
+ case ArrayType(et) => et :: Nil
+ case MapType(kt,vt) => kt :: vt :: Nil
+ }
+
+ // TODO: Move this pattern into Generator.
+ protected def makeOutput() =
+ if (attributeNames.size == elementTypes.size) {
+ attributeNames.zip(elementTypes).map {
+ case (n, t) => AttributeReference(n, t, nullable = true)()
+ }
+ } else {
+ elementTypes.zipWithIndex.map {
+ case (t, i) => AttributeReference(s"c_$i", t, nullable = true)()
+ }
+ }
+
+ override def apply(input: Row): TraversableOnce[Row] = {
+ child.dataType match {
+ case ArrayType(_) =>
+ val inputArray = child.apply(input).asInstanceOf[Seq[Any]]
+ if (inputArray == null) Nil else inputArray.map(v => new GenericRow(Array(v)))
+ case MapType(_, _) =>
+ val inputMap = child.apply(input).asInstanceOf[Map[Any,Any]]
+ if (inputMap == null) Nil else inputMap.map { case (k,v) => new GenericRow(Array(k,v)) }
+ }
+ }
+
+ override def toString() = s"explode($child)"
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
new file mode 100644
index 0000000..229d8f7
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types._
+
+object Literal {
+ def apply(v: Any): Literal = v match {
+ case i: Int => Literal(i, IntegerType)
+ case l: Long => Literal(l, LongType)
+ case d: Double => Literal(d, DoubleType)
+ case f: Float => Literal(f, FloatType)
+ case b: Byte => Literal(b, ByteType)
+ case s: Short => Literal(s, ShortType)
+ case s: String => Literal(s, StringType)
+ case b: Boolean => Literal(b, BooleanType)
+ case null => Literal(null, NullType)
+ }
+}
+
+/**
+ * Extractor for retrieving Int literals.
+ */
+object IntegerLiteral {
+ def unapply(a: Any): Option[Int] = a match {
+ case Literal(a: Int, IntegerType) => Some(a)
+ case _ => None
+ }
+}
+
+case class Literal(value: Any, dataType: DataType) extends LeafExpression {
+
+ override def foldable = true
+ def nullable = value == null
+ def references = Set.empty
+
+ override def toString = if (value != null) value.toString else "null"
+
+ type EvaluatedType = Any
+ override def apply(input: Row):Any = value
+}
+
+// TODO: Specialize
+case class MutableLiteral(var value: Any, nullable: Boolean = true) extends LeafExpression {
+ type EvaluatedType = Any
+
+ val dataType = Literal(value).dataType
+
+ def references = Set.empty
+
+ def update(expression: Expression, input: Row) = {
+ value = expression.apply(input)
+ }
+
+ override def apply(input: Row) = value
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
new file mode 100644
index 0000000..0a06e85
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import catalyst.analysis.UnresolvedAttribute
+import types._
+
+object NamedExpression {
+ private val curId = new java.util.concurrent.atomic.AtomicLong()
+ def newExprId = ExprId(curId.getAndIncrement())
+}
+
+/**
+ * A globally (within this JVM) id for a given named expression.
+ * Used to identify with attribute output by a relation is being
+ * referenced in a subsuqent computation.
+ */
+case class ExprId(id: Long)
+
+abstract class NamedExpression extends Expression {
+ self: Product =>
+
+ def name: String
+ def exprId: ExprId
+ def qualifiers: Seq[String]
+
+ def toAttribute: Attribute
+
+ protected def typeSuffix =
+ if (resolved) {
+ dataType match {
+ case LongType => "L"
+ case _ => ""
+ }
+ } else {
+ ""
+ }
+}
+
+abstract class Attribute extends NamedExpression {
+ self: Product =>
+
+ def withQualifiers(newQualifiers: Seq[String]): Attribute
+
+ def references = Set(this)
+ def toAttribute = this
+ def newInstance: Attribute
+}
+
+/**
+ * Used to assign a new name to a computation.
+ * For example the SQL expression "1 + 1 AS a" could be represented as follows:
+ * Alias(Add(Literal(1), Literal(1), "a")()
+ *
+ * @param child the computation being performed
+ * @param name the name to be associated with the result of computing [[child]].
+ * @param exprId A globally unique id used to check if an [[AttributeReference]] refers to this
+ * alias. Auto-assigned if left blank.
+ */
+case class Alias(child: Expression, name: String)
+ (val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
+ extends NamedExpression with trees.UnaryNode[Expression] {
+
+ type EvaluatedType = Any
+
+ override def apply(input: Row) = child.apply(input)
+
+ def dataType = child.dataType
+ def nullable = child.nullable
+ def references = child.references
+
+ def toAttribute = {
+ if (resolved) {
+ AttributeReference(name, child.dataType, child.nullable)(exprId, qualifiers)
+ } else {
+ UnresolvedAttribute(name)
+ }
+ }
+
+ override def toString: String = s"$child AS $name#${exprId.id}$typeSuffix"
+
+ override protected final def otherCopyArgs = exprId :: qualifiers :: Nil
+}
+
+/**
+ * A reference to an attribute produced by another operator in the tree.
+ *
+ * @param name The name of this attribute, should only be used during analysis or for debugging.
+ * @param dataType The [[types.DataType DataType]] of this attribute.
+ * @param nullable True if null is a valid value for this attribute.
+ * @param exprId A globally unique id used to check if different AttributeReferences refer to the
+ * same attribute.
+ * @param qualifiers a list of strings that can be used to referred to this attribute in a fully
+ * qualified way. Consider the examples tableName.name, subQueryAlias.name.
+ * tableName and subQueryAlias are possible qualifiers.
+ */
+case class AttributeReference(name: String, dataType: DataType, nullable: Boolean = true)
+ (val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
+ extends Attribute with trees.LeafNode[Expression] {
+
+ override def equals(other: Any) = other match {
+ case ar: AttributeReference => exprId == ar.exprId && dataType == ar.dataType
+ case _ => false
+ }
+
+ override def hashCode: Int = {
+ // See http://stackoverflow.com/questions/113511/hash-code-implementation
+ var h = 17
+ h = h * 37 + exprId.hashCode()
+ h = h * 37 + dataType.hashCode()
+ h
+ }
+
+ def newInstance = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers)
+
+ /**
+ * Returns a copy of this [[AttributeReference]] with changed nullability.
+ */
+ def withNullability(newNullability: Boolean) = {
+ if (nullable == newNullability) {
+ this
+ } else {
+ AttributeReference(name, dataType, newNullability)(exprId, qualifiers)
+ }
+ }
+
+ /**
+ * Returns a copy of this [[AttributeReference]] with new qualifiers.
+ */
+ def withQualifiers(newQualifiers: Seq[String]) = {
+ if (newQualifiers == qualifiers) {
+ this
+ } else {
+ AttributeReference(name, dataType, nullable)(exprId, newQualifiers)
+ }
+ }
+
+ override def toString: String = s"$name#${exprId.id}$typeSuffix"
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
new file mode 100644
index 0000000..e869a4d
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import catalyst.analysis.UnresolvedException
+
+case class Coalesce(children: Seq[Expression]) extends Expression {
+ type EvaluatedType = Any
+
+ /** Coalesce is nullable if all of its children are nullable, or if it has no children. */
+ def nullable = !children.exists(!_.nullable)
+
+ def references = children.flatMap(_.references).toSet
+ // Coalesce is foldable if all children are foldable.
+ override def foldable = !children.exists(!_.foldable)
+
+ // Only resolved if all the children are of the same type.
+ override lazy val resolved = childrenResolved && (children.map(_.dataType).distinct.size == 1)
+
+ override def toString = s"Coalesce(${children.mkString(",")})"
+
+ def dataType = if (resolved) {
+ children.head.dataType
+ } else {
+ throw new UnresolvedException(this, "Coalesce cannot have children of different types.")
+ }
+
+ override def apply(input: Row): Any = {
+ var i = 0
+ var result: Any = null
+ while(i < children.size && result == null) {
+ result = children(i).apply(input)
+ i += 1
+ }
+ result
+ }
+}
+
+case class IsNull(child: Expression) extends Predicate with trees.UnaryNode[Expression] {
+ def references = child.references
+ override def foldable = child.foldable
+ def nullable = false
+
+ override def apply(input: Row): Any = {
+ child.apply(input) == null
+ }
+}
+
+case class IsNotNull(child: Expression) extends Predicate with trees.UnaryNode[Expression] {
+ def references = child.references
+ override def foldable = child.foldable
+ def nullable = false
+ override def toString = s"IS NOT NULL $child"
+
+ override def apply(input: Row): Any = {
+ child.apply(input) != null
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
new file mode 100644
index 0000000..76554e1
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+/**
+ * A set of classes that can be used to represent trees of relational expressions. A key goal of
+ * the expression library is to hide the details of naming and scoping from developers who want to
+ * manipulate trees of relational operators. As such, the library defines a special type of
+ * expression, a [[NamedExpression]] in addition to the standard collection of expressions.
+ *
+ * ==Standard Expressions==
+ * A library of standard expressions (e.g., [[Add]], [[Equals]]), aggregates (e.g., SUM, COUNT),
+ * and other computations (e.g. UDFs). Each expression type is capable of determining its output
+ * schema as a function of its children's output schema.
+ *
+ * ==Named Expressions==
+ * Some expression are named and thus can be referenced by later operators in the dataflow graph.
+ * The two types of named expressions are [[AttributeReference]]s and [[Alias]]es.
+ * [[AttributeReference]]s refer to attributes of the input tuple for a given operator and form
+ * the leaves of some expression trees. Aliases assign a name to intermediate computations.
+ * For example, in the SQL statement `SELECT a+b AS c FROM ...`, the expressions `a` and `b` would
+ * be represented by `AttributeReferences` and `c` would be represented by an `Alias`.
+ *
+ * During [[analysis]], all named expressions are assigned a globally unique expression id, which
+ * can be used for equality comparisons. While the original names are kept around for debugging
+ * purposes, they should never be used to check if two attributes refer to the same value, as
+ * plan transformations can result in the introduction of naming ambiguity. For example, consider
+ * a plan that contains subqueries, both of which are reading from the same table. If an
+ * optimization removes the subqueries, scoping information would be destroyed, eliminating the
+ * ability to reason about which subquery produced a given attribute.
+ *
+ * ==Evaluation==
+ * The result of expressions can be evaluated using the [[Evaluate]] object.
+ */
+package object expressions
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
new file mode 100644
index 0000000..561396e
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import types._
+import catalyst.analysis.UnresolvedException
+
+trait Predicate extends Expression {
+ self: Product =>
+
+ def dataType = BooleanType
+
+ type EvaluatedType = Any
+}
+
+trait PredicateHelper {
+ def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match {
+ case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
+ case other => other :: Nil
+ }
+}
+
+abstract class BinaryPredicate extends BinaryExpression with Predicate {
+ self: Product =>
+ def nullable = left.nullable || right.nullable
+}
+
+case class Not(child: Expression) extends Predicate with trees.UnaryNode[Expression] {
+ def references = child.references
+ override def foldable = child.foldable
+ def nullable = child.nullable
+ override def toString = s"NOT $child"
+
+ override def apply(input: Row): Any = {
+ child.apply(input) match {
+ case null => null
+ case b: Boolean => !b
+ }
+ }
+}
+
+/**
+ * Evaluates to `true` if `list` contains `value`.
+ */
+case class In(value: Expression, list: Seq[Expression]) extends Predicate {
+ def children = value +: list
+ def references = children.flatMap(_.references).toSet
+ def nullable = true // TODO: Figure out correct nullability semantics of IN.
+ override def toString = s"$value IN ${list.mkString("(", ",", ")")}"
+
+ override def apply(input: Row): Any = {
+ val evaluatedValue = value.apply(input)
+ list.exists(e => e.apply(input) == evaluatedValue)
+ }
+}
+
+case class And(left: Expression, right: Expression) extends BinaryPredicate {
+ def symbol = "&&"
+
+ override def apply(input: Row): Any = {
+ val l = left.apply(input)
+ val r = right.apply(input)
+ if (l == false || r == false) {
+ false
+ } else if (l == null || r == null ) {
+ null
+ } else {
+ true
+ }
+ }
+}
+
+case class Or(left: Expression, right: Expression) extends BinaryPredicate {
+ def symbol = "||"
+
+ override def apply(input: Row): Any = {
+ val l = left.apply(input)
+ val r = right.apply(input)
+ if (l == true || r == true) {
+ true
+ } else if (l == null || r == null) {
+ null
+ } else {
+ false
+ }
+ }
+}
+
+abstract class BinaryComparison extends BinaryPredicate {
+ self: Product =>
+}
+
+case class Equals(left: Expression, right: Expression) extends BinaryComparison {
+ def symbol = "="
+ override def apply(input: Row): Any = {
+ val l = left.apply(input)
+ val r = right.apply(input)
+ if (l == null || r == null) null else l == r
+ }
+}
+
+case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
+ def symbol = "<"
+ override def apply(input: Row): Any = {
+ if (left.dataType == StringType && right.dataType == StringType) {
+ val l = left.apply(input)
+ val r = right.apply(input)
+ if(l == null || r == null) {
+ null
+ } else {
+ l.asInstanceOf[String] < r.asInstanceOf[String]
+ }
+ } else {
+ n2(input, left, right, _.lt(_, _))
+ }
+ }
+}
+
+case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+ def symbol = "<="
+ override def apply(input: Row): Any = {
+ if (left.dataType == StringType && right.dataType == StringType) {
+ val l = left.apply(input)
+ val r = right.apply(input)
+ if(l == null || r == null) {
+ null
+ } else {
+ l.asInstanceOf[String] <= r.asInstanceOf[String]
+ }
+ } else {
+ n2(input, left, right, _.lteq(_, _))
+ }
+ }
+}
+
+case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
+ def symbol = ">"
+ override def apply(input: Row): Any = {
+ if (left.dataType == StringType && right.dataType == StringType) {
+ val l = left.apply(input)
+ val r = right.apply(input)
+ if(l == null || r == null) {
+ null
+ } else {
+ l.asInstanceOf[String] > r.asInstanceOf[String]
+ }
+ } else {
+ n2(input, left, right, _.gt(_, _))
+ }
+ }
+}
+
+case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+ def symbol = ">="
+ override def apply(input: Row): Any = {
+ if (left.dataType == StringType && right.dataType == StringType) {
+ val l = left.apply(input)
+ val r = right.apply(input)
+ if(l == null || r == null) {
+ null
+ } else {
+ l.asInstanceOf[String] >= r.asInstanceOf[String]
+ }
+ } else {
+ n2(input, left, right, _.gteq(_, _))
+ }
+ }
+}
+
+case class If(predicate: Expression, trueValue: Expression, falseValue: Expression)
+ extends Expression {
+
+ def children = predicate :: trueValue :: falseValue :: Nil
+ def nullable = trueValue.nullable || falseValue.nullable
+ def references = children.flatMap(_.references).toSet
+ override lazy val resolved = childrenResolved && trueValue.dataType == falseValue.dataType
+ def dataType = {
+ if (!resolved) {
+ throw new UnresolvedException(
+ this,
+ s"Can not resolve due to differing types ${trueValue.dataType}, ${falseValue.dataType}")
+ }
+ trueValue.dataType
+ }
+
+ type EvaluatedType = Any
+ override def apply(input: Row): Any = {
+ if (predicate(input).asInstanceOf[Boolean]) {
+ trueValue.apply(input)
+ } else {
+ falseValue.apply(input)
+ }
+ }
+
+ override def toString = s"if ($predicate) $trueValue else $falseValue"
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
new file mode 100644
index 0000000..6e58523
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package expressions
+
+import catalyst.types.BooleanType
+
+case class Like(left: Expression, right: Expression) extends BinaryExpression {
+ def dataType = BooleanType
+ def nullable = left.nullable // Right cannot be null.
+ def symbol = "LIKE"
+}
+