You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/03/21 02:04:49 UTC

[6/9] SPARK-1251 Support for optimizing and executing structured queries

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
new file mode 100644
index 0000000..4db2803
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package optimizer
+
+import catalyst.expressions._
+import catalyst.plans.logical._
+import catalyst.rules._
+import catalyst.types.BooleanType
+import catalyst.plans.Inner
+
+object Optimizer extends RuleExecutor[LogicalPlan] {
+  val batches =
+    Batch("Subqueries", Once,
+      EliminateSubqueries) ::
+    Batch("ConstantFolding", Once,
+      ConstantFolding,
+      BooleanSimplification,
+      SimplifyCasts) ::
+    Batch("Filter Pushdown", Once,
+      EliminateSubqueries,
+      CombineFilters,
+      PushPredicateThroughProject,
+      PushPredicateThroughInnerJoin) :: Nil
+}
+
+/**
+ * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan.  Subqueries are
+ * only required to provide scoping information for attributes and can be removed once analysis is
+ * complete.
+ */
+object EliminateSubqueries extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case Subquery(_, child) => child
+  }
+}
+
+/**
+ * Replaces [[catalyst.expressions.Expression Expressions]] that can be statically evaluated with
+ * equivalent [[catalyst.expressions.Literal Literal]] values.
+ */
+object ConstantFolding extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case q: LogicalPlan => q transformExpressionsDown {
+      // Skip redundant folding of literals.
+      case l: Literal => l
+      case e if e.foldable => Literal(e.apply(null), e.dataType)
+    }
+  }
+}
+
+/**
+ * Simplifies boolean expressions where the answer can be determined without evaluating both sides.
+ * Note that this rule can eliminate expressions that might otherwise have been evaluated and thus
+ * is only safe when evaluations of expressions does not result in side effects.
+ */
+object BooleanSimplification extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case q: LogicalPlan => q transformExpressionsUp {
+      case and @ And(left, right) => {
+        (left, right) match {
+          case (Literal(true, BooleanType), r) => r
+          case (l, Literal(true, BooleanType)) => l
+          case (Literal(false, BooleanType), _) => Literal(false)
+          case (_, Literal(false, BooleanType)) => Literal(false)
+          case (_, _) => and
+        }
+      }
+      case or @ Or(left, right) => {
+        (left, right) match {
+          case (Literal(true, BooleanType), _) => Literal(true)
+          case (_, Literal(true, BooleanType)) => Literal(true)
+          case (Literal(false, BooleanType), r) => r
+          case (l, Literal(false, BooleanType)) => l
+          case (_, _) => or
+        }
+      }
+    }
+  }
+}
+
+/**
+ * Combines two adjacent [[catalyst.plans.logical.Filter Filter]] operators into one, merging the
+ * conditions into one conjunctive predicate.
+ */
+object CombineFilters extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case ff@Filter(fc, nf@Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)
+  }
+}
+
+/**
+ * Pushes [[catalyst.plans.logical.Filter Filter]] operators through
+ * [[catalyst.plans.logical.Project Project]] operators, in-lining any
+ * [[catalyst.expressions.Alias Aliases]] that were defined in the projection.
+ *
+ * This heuristic is valid assuming the expression evaluation cost is minimal.
+ */
+object PushPredicateThroughProject extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case filter@Filter(condition, project@Project(fields, grandChild)) =>
+      val sourceAliases = fields.collect { case a@Alias(c, _) => a.toAttribute -> c }.toMap
+      project.copy(child = filter.copy(
+        replaceAlias(condition, sourceAliases),
+        grandChild))
+  }
+
+  //
+  def replaceAlias(condition: Expression, sourceAliases: Map[Attribute, Expression]): Expression = {
+    condition transform {
+      case a: AttributeReference => sourceAliases.getOrElse(a, a)
+    }
+  }
+}
+
+/**
+ * Pushes down [[catalyst.plans.logical.Filter Filter]] operators where the `condition` can be
+ * evaluated using only the attributes of the left or right side of an inner join.  Other
+ * [[catalyst.plans.logical.Filter Filter]] conditions are moved into the `condition` of the
+ * [[catalyst.plans.logical.Join Join]].
+ */
+object PushPredicateThroughInnerJoin extends Rule[LogicalPlan] with PredicateHelper {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case f @ Filter(filterCondition, Join(left, right, Inner, joinCondition)) =>
+      val allConditions =
+        splitConjunctivePredicates(filterCondition) ++
+          joinCondition.map(splitConjunctivePredicates).getOrElse(Nil)
+
+      // Split the predicates into those that can be evaluated on the left, right, and those that
+      // must be evaluated after the join.
+      val (rightConditions, leftOrJoinConditions) =
+        allConditions.partition(_.references subsetOf right.outputSet)
+      val (leftConditions, joinConditions) =
+        leftOrJoinConditions.partition(_.references subsetOf left.outputSet)
+
+      // Build the new left and right side, optionally with the pushed down filters.
+      val newLeft = leftConditions.reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
+      val newRight = rightConditions.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
+      Join(newLeft, newRight, Inner, joinConditions.reduceLeftOption(And))
+  }
+}
+
+/**
+ * Removes [[catalyst.expressions.Cast Casts]] that are unnecessary because the input is already
+ * the correct type.
+ */
+object SimplifyCasts extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+    case Cast(e, dataType) if e.dataType == dataType => e
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
new file mode 100644
index 0000000..22f8ea0
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package planning
+
+
+import plans.logical.LogicalPlan
+import trees._
+
+/**
+ * Abstract class for transforming [[plans.logical.LogicalPlan LogicalPlan]]s into physical plans.
+ * Child classes are responsible for specifying a list of [[Strategy]] objects that each of which
+ * can return a list of possible physical plan options.  If a given strategy is unable to plan all
+ * of the remaining operators in the tree, it can call [[planLater]], which returns a placeholder
+ * object that will be filled in using other available strategies.
+ *
+ * TODO: RIGHT NOW ONLY ONE PLAN IS RETURNED EVER...
+ *       PLAN SPACE EXPLORATION WILL BE IMPLEMENTED LATER.
+ *
+ * @tparam PhysicalPlan The type of physical plan produced by this [[QueryPlanner]]
+ */
+abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
+  /** A list of execution strategies that can be used by the planner */
+  def strategies: Seq[Strategy]
+
+  /**
+   * Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can
+   * be used for execution. If this strategy does not apply to the give logical operation then an
+   * empty list should be returned.
+   */
+  abstract protected class Strategy extends Logging {
+    def apply(plan: LogicalPlan): Seq[PhysicalPlan]
+  }
+
+  /**
+   * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be
+   * filled in automatically by the QueryPlanner using the other execution strategies that are
+   * available.
+   */
+  protected def planLater(plan: LogicalPlan) = apply(plan).next()
+
+  def apply(plan: LogicalPlan): Iterator[PhysicalPlan] = {
+    // Obviously a lot to do here still...
+    val iter = strategies.view.flatMap(_(plan)).toIterator
+    assert(iter.hasNext, s"No plan for $plan")
+    iter
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/package.scala
new file mode 100644
index 0000000..64370ec
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+/**
+ * Contains classes for enumerating possible physical plans for a given logical query plan.
+ */
+package object planning

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
new file mode 100644
index 0000000..613b028
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package planning
+
+import scala.annotation.tailrec
+
+import expressions._
+import plans.logical._
+
+/**
+ * A pattern that matches any number of filter operations on top of another relational operator.
+ * Adjacent filter operators are collected and their conditions are broken up and returned as a
+ * sequence of conjunctive predicates.
+ *
+ * @return A tuple containing a sequence of conjunctive predicates that should be used to filter the
+ *         output and a relational operator.
+ */
+object FilteredOperation extends PredicateHelper {
+  type ReturnType = (Seq[Expression], LogicalPlan)
+
+  def unapply(plan: LogicalPlan): Option[ReturnType] = Some(collectFilters(Nil, plan))
+
+  @tailrec
+  private def collectFilters(filters: Seq[Expression], plan: LogicalPlan): ReturnType = plan match {
+    case Filter(condition, child) =>
+      collectFilters(filters ++ splitConjunctivePredicates(condition), child)
+    case other => (filters, other)
+  }
+}
+
+/**
+ * A pattern that matches any number of project or filter operations on top of another relational
+ * operator.  All filter operators are collected and their conditions are broken up and returned
+ * together with the top project operator.  [[Alias Aliases]] are in-lined/substituted if necessary.
+ */
+object PhysicalOperation extends PredicateHelper {
+  type ReturnType = (Seq[NamedExpression], Seq[Expression], LogicalPlan)
+
+  def unapply(plan: LogicalPlan): Option[ReturnType] = {
+    val (fields, filters, child, _) = collectProjectsAndFilters(plan)
+    Some((fields.getOrElse(child.output), filters, child))
+  }
+
+  /**
+   * Collects projects and filters, in-lining/substituting aliases if necessary.  Here are two
+   * examples for alias in-lining/substitution.  Before:
+   * {{{
+   *   SELECT c1 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10
+   *   SELECT c1 AS c2 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10
+   * }}}
+   * After:
+   * {{{
+   *   SELECT key AS c1 FROM t1 WHERE key > 10
+   *   SELECT key AS c2 FROM t1 WHERE key > 10
+   * }}}
+   */
+  def collectProjectsAndFilters(plan: LogicalPlan):
+      (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression]) =
+    plan match {
+      case Project(fields, child) =>
+        val (_, filters, other, aliases) = collectProjectsAndFilters(child)
+        val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
+        (Some(substitutedFields), filters, other, collectAliases(substitutedFields))
+
+      case Filter(condition, child) =>
+        val (fields, filters, other, aliases) = collectProjectsAndFilters(child)
+        val substitutedCondition = substitute(aliases)(condition)
+        (fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)
+
+      case other =>
+        (None, Nil, other, Map.empty)
+    }
+
+  def collectAliases(fields: Seq[Expression]) = fields.collect {
+    case a @ Alias(child, _) => a.toAttribute.asInstanceOf[Attribute] -> child
+  }.toMap
+
+  def substitute(aliases: Map[Attribute, Expression])(expr: Expression) = expr.transform {
+    case a @ Alias(ref: AttributeReference, name) =>
+      aliases.get(ref).map(Alias(_, name)(a.exprId, a.qualifiers)).getOrElse(a)
+
+    case a: AttributeReference =>
+      aliases.get(a).map(Alias(_, a.name)(a.exprId, a.qualifiers)).getOrElse(a)
+  }
+}
+
+/**
+ * A pattern that collects all adjacent unions and returns their children as a Seq.
+ */
+object Unions {
+  def unapply(plan: LogicalPlan): Option[Seq[LogicalPlan]] = plan match {
+    case u: Union => Some(collectUnionChildren(u))
+    case _ => None
+  }
+
+  private def collectUnionChildren(plan: LogicalPlan): Seq[LogicalPlan] = plan match {
+    case Union(l, r) => collectUnionChildren(l) ++ collectUnionChildren(r)
+    case other => other :: Nil
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
new file mode 100644
index 0000000..20f230c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+
+import catalyst.expressions.{SortOrder, Attribute, Expression}
+import catalyst.trees._
+
+abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
+  self: PlanType with Product =>
+
+  def output: Seq[Attribute]
+
+  /**
+   * Returns the set of attributes that are output by this node.
+   */
+  def outputSet: Set[Attribute] = output.toSet
+
+  /**
+   * Runs [[transform]] with `rule` on all expressions present in this query operator.
+   * Users should not expect a specific directionality. If a specific directionality is needed,
+   * transformExpressionsDown or transformExpressionsUp should be used.
+   * @param rule the rule to be applied to every expression in this operator.
+   */
+  def transformExpressions(rule: PartialFunction[Expression, Expression]): this.type = {
+    transformExpressionsDown(rule)
+  }
+
+  /**
+   * Runs [[transformDown]] with `rule` on all expressions present in this query operator.
+   * @param rule the rule to be applied to every expression in this operator.
+   */
+  def transformExpressionsDown(rule: PartialFunction[Expression, Expression]): this.type = {
+    var changed = false
+
+    @inline def transformExpressionDown(e: Expression) = {
+      val newE = e.transformDown(rule)
+      if (newE.id != e.id && newE != e) {
+        changed = true
+        newE
+      } else {
+        e
+      }
+    }
+
+    val newArgs = productIterator.map {
+      case e: Expression => transformExpressionDown(e)
+      case Some(e: Expression) => Some(transformExpressionDown(e))
+      case m: Map[_,_] => m
+      case seq: Traversable[_] => seq.map {
+        case e: Expression => transformExpressionDown(e)
+        case other => other
+      }
+      case other: AnyRef => other
+    }.toArray
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  /**
+   * Runs [[transformUp]] with `rule` on all expressions present in this query operator.
+   * @param rule the rule to be applied to every expression in this operator.
+   * @return
+   */
+  def transformExpressionsUp(rule: PartialFunction[Expression, Expression]): this.type = {
+    var changed = false
+
+    @inline def transformExpressionUp(e: Expression) = {
+      val newE = e.transformUp(rule)
+      if (newE.id != e.id && newE != e) {
+        changed = true
+        newE
+      } else {
+        e
+      }
+    }
+
+    val newArgs = productIterator.map {
+      case e: Expression => transformExpressionUp(e)
+      case Some(e: Expression) => Some(transformExpressionUp(e))
+      case m: Map[_,_] => m
+      case seq: Traversable[_] => seq.map {
+        case e: Expression => transformExpressionUp(e)
+        case other => other
+      }
+      case other: AnyRef => other
+    }.toArray
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  /** Returns the result of running [[transformExpressions]] on this node
+    * and all its children. */
+  def transformAllExpressions(rule: PartialFunction[Expression, Expression]): this.type = {
+    transform {
+      case q: QueryPlan[_] => q.transformExpressions(rule).asInstanceOf[PlanType]
+    }.asInstanceOf[this.type]
+  }
+
+  /** Returns all of the expressions present in this query plan operator. */
+  def expressions: Seq[Expression] = {
+    productIterator.flatMap {
+      case e: Expression => e :: Nil
+      case Some(e: Expression) => e :: Nil
+      case seq: Traversable[_] => seq.flatMap {
+        case e: Expression => e :: Nil
+        case other => Nil
+      }
+      case other => Nil
+    }.toSeq
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
new file mode 100644
index 0000000..9f2283a
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+
+sealed abstract class JoinType
+case object Inner extends JoinType
+case object LeftOuter extends JoinType
+case object RightOuter extends JoinType
+case object FullOuter extends JoinType

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala
new file mode 100644
index 0000000..48ff45c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package logical
+
+abstract class BaseRelation extends LeafNode {
+  self: Product =>
+
+  def tableName: String
+  def isPartitioned: Boolean = false
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
new file mode 100644
index 0000000..bc7b687
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package logical
+
+import catalyst.expressions._
+import catalyst.errors._
+import catalyst.types.StructType
+
+abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
+  self: Product =>
+
+  /**
+   * Returns the set of attributes that are referenced by this node
+   * during evaluation.
+   */
+  def references: Set[Attribute]
+
+  /**
+   * Returns the set of attributes that this node takes as
+   * input from its children.
+   */
+  lazy val inputSet: Set[Attribute] = children.flatMap(_.output).toSet
+
+  /**
+   * Returns true if this expression and all its children have been resolved to a specific schema
+   * and false if it is still contains any unresolved placeholders. Implementations of LogicalPlan
+   * can override this (e.g. [[catalyst.analysis.UnresolvedRelation UnresolvedRelation]] should
+   * return `false`).
+   */
+  lazy val resolved: Boolean = !expressions.exists(!_.resolved) && childrenResolved
+
+  /**
+   * Returns true if all its children of this query plan have been resolved.
+   */
+  def childrenResolved = !children.exists(!_.resolved)
+
+  /**
+   * Optionally resolves the given string to a
+   * [[catalyst.expressions.NamedExpression NamedExpression]]. The attribute is expressed as
+   * as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
+   */
+  def resolve(name: String): Option[NamedExpression] = {
+    val parts = name.split("\\.")
+    // Collect all attributes that are output by this nodes children where either the first part
+    // matches the name or where the first part matches the scope and the second part matches the
+    // name.  Return these matches along with any remaining parts, which represent dotted access to
+    // struct fields.
+    val options = children.flatMap(_.output).flatMap { option =>
+      // If the first part of the desired name matches a qualifier for this possible match, drop it.
+      val remainingParts = if (option.qualifiers contains parts.head) parts.drop(1) else parts
+      if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil
+    }
+
+    options.distinct match {
+      case (a, Nil) :: Nil => Some(a) // One match, no nested fields, use it.
+      // One match, but we also need to extract the requested nested field.
+      case (a, nestedFields) :: Nil =>
+        a.dataType match {
+          case StructType(fields) =>
+            Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
+          case _ => None // Don't know how to resolve these field references
+        }
+      case Nil => None         // No matches.
+      case ambiguousReferences =>
+        throw new TreeNodeException(
+          this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
+    }
+  }
+}
+
+/**
+ * A logical plan node with no children.
+ */
+abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
+  self: Product =>
+
+  // Leaf nodes by definition cannot reference any input attributes.
+  def references = Set.empty
+}
+
+/**
+ * A logical node that represents a non-query command to be executed by the system.  For example,
+ * commands can be used by parsers to represent DDL operations.
+ */
+abstract class Command extends LeafNode {
+  self: Product =>
+  def output = Seq.empty
+}
+
+/**
+ * Returned for commands supported by a given parser, but not catalyst.  In general these are DDL
+ * commands that are passed directly to another system.
+ */
+case class NativeCommand(cmd: String) extends Command
+
+/**
+ * Returned by a parser when the users only wants to see what query plan would be executed, without
+ * actually performing the execution.
+ */
+case class ExplainCommand(plan: LogicalPlan) extends Command
+
+/**
+ * A logical plan node with single child.
+ */
+abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {
+  self: Product =>
+}
+
+/**
+ * A logical plan node with a left and right child.
+ */
+abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {
+  self: Product =>
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
new file mode 100644
index 0000000..1a1a2b9
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package logical
+
+import expressions._
+
+/**
+ * Transforms the input by forking and running the specified script.
+ *
+ * @param input the set of expression that should be passed to the script.
+ * @param script the command that should be executed.
+ * @param output the attributes that are produced by the script.
+ */
+case class ScriptTransformation(
+    input: Seq[Expression],
+    script: String,
+    output: Seq[Attribute],
+    child: LogicalPlan) extends UnaryNode {
+  def references = input.flatMap(_.references).toSet
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala
new file mode 100644
index 0000000..b5905a4
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package logical
+
+import expressions._
+import rules._
+
+object LocalRelation {
+  def apply(output: Attribute*) =
+    new LocalRelation(output)
+}
+
+case class LocalRelation(output: Seq[Attribute], data: Seq[Product] = Nil)
+  extends LeafNode with analysis.MultiInstanceRelation {
+
+  // TODO: Validate schema compliance.
+  def loadData(newData: Seq[Product]) = new LocalRelation(output, data ++ newData)
+
+  /**
+   * Returns an identical copy of this relation with new exprIds for all attributes.  Different
+   * attributes are required when a relation is going to be included multiple times in the same
+   * query.
+   */
+  override final def newInstance: this.type = {
+    LocalRelation(output.map(_.newInstance), data).asInstanceOf[this.type]
+  }
+
+  override protected def stringArgs = Iterator(output)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
new file mode 100644
index 0000000..8e98aab
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package logical
+
+import expressions._
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
+  def output = projectList.map(_.toAttribute)
+  def references = projectList.flatMap(_.references).toSet
+}
+
+/**
+ * Applies a [[catalyst.expressions.Generator Generator]] to a stream of input rows, combining the
+ * output of each into a new stream of rows.  This operation is similar to a `flatMap` in functional
+ * programming with one important additional feature, which allows the input rows to be joined with
+ * their output.
+ * @param join  when true, each output row is implicitly joined with the input tuple that produced
+ *              it.
+ * @param outer when true, each input row will be output at least once, even if the output of the
+ *              given `generator` is empty. `outer` has no effect when `join` is false.
+ * @param alias when set, this string is applied to the schema of the output of the transformation
+ *              as a qualifier.
+ */
+case class Generate(
+    generator: Generator,
+    join: Boolean,
+    outer: Boolean,
+    alias: Option[String],
+    child: LogicalPlan)
+  extends UnaryNode {
+
+  protected def generatorOutput =
+    alias
+      .map(a => generator.output.map(_.withQualifiers(a :: Nil)))
+      .getOrElse(generator.output)
+
+  def output =
+    if (join) child.output ++ generatorOutput else generatorOutput
+
+  def references =
+    if (join) child.outputSet else generator.references
+}
+
+case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
+  def output = child.output
+  def references = condition.references
+}
+
+case class Union(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
+  // TODO: These aren't really the same attributes as nullability etc might change.
+  def output = left.output
+
+  override lazy val resolved =
+    childrenResolved &&
+    !left.output.zip(right.output).exists { case (l,r) => l.dataType != r.dataType }
+
+  def references = Set.empty
+}
+
+case class Join(
+  left: LogicalPlan,
+  right: LogicalPlan,
+  joinType: JoinType,
+  condition: Option[Expression]) extends BinaryNode {
+
+  def references = condition.map(_.references).getOrElse(Set.empty)
+  def output = left.output ++ right.output
+}
+
+case class InsertIntoTable(
+    table: BaseRelation,
+    partition: Map[String, Option[String]],
+    child: LogicalPlan,
+    overwrite: Boolean)
+  extends LogicalPlan {
+  // The table being inserted into is a child for the purposes of transformations.
+  def children = table :: child :: Nil
+  def references = Set.empty
+  def output = child.output
+
+  override lazy val resolved = childrenResolved && child.output.zip(table.output).forall {
+    case (childAttr, tableAttr) => childAttr.dataType == tableAttr.dataType
+  }
+}
+
+case class InsertIntoCreatedTable(
+    databaseName: Option[String],
+    tableName: String,
+    child: LogicalPlan) extends UnaryNode {
+  def references = Set.empty
+  def output = child.output
+}
+
+case class WriteToFile(
+    path: String,
+    child: LogicalPlan) extends UnaryNode {
+  def references = Set.empty
+  def output = child.output
+}
+
+case class Sort(order: Seq[SortOrder], child: LogicalPlan) extends UnaryNode {
+  def output = child.output
+  def references = order.flatMap(_.references).toSet
+}
+
+case class Aggregate(
+    groupingExpressions: Seq[Expression],
+    aggregateExpressions: Seq[NamedExpression],
+    child: LogicalPlan)
+  extends UnaryNode {
+
+  def output = aggregateExpressions.map(_.toAttribute)
+  def references = child.references
+}
+
+case class StopAfter(limit: Expression, child: LogicalPlan) extends UnaryNode {
+  def output = child.output
+  def references = limit.references
+}
+
+case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
+  def output = child.output.map(_.withQualifiers(alias :: Nil))
+  def references = Set.empty
+}
+
+case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: LogicalPlan)
+    extends UnaryNode {
+
+  def output = child.output
+  def references = Set.empty
+}
+
+case class Distinct(child: LogicalPlan) extends UnaryNode {
+  def output = child.output
+  def references = child.outputSet
+}
+
+case object NoRelation extends LeafNode {
+  def output = Nil
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
new file mode 100644
index 0000000..f7fcdc5
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package logical
+
+import expressions._
+
+/**
+ * Performs a physical redistribution of the data.  Used when the consumer of the query
+ * result have expectations about the distribution and ordering of partitioned input data.
+ */
+abstract class RedistributeData extends UnaryNode {
+  self: Product =>
+
+  def output = child.output
+}
+
+case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
+    extends RedistributeData {
+
+  def references = sortExpressions.flatMap(_.references).toSet
+}
+
+case class Repartition(partitionExpressions: Seq[Expression], child: LogicalPlan)
+    extends RedistributeData {
+
+  def references = partitionExpressions.flatMap(_.references).toSet
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/package.scala
new file mode 100644
index 0000000..a40ab4b
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+/**
+ * A a collection of common abstractions for query plans as well as
+ * a base logical plan representation.
+ */
+package object plans

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
new file mode 100644
index 0000000..2d8f3ad
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package physical
+
+import expressions._
+import types._
+
+/**
+ * Specifies how tuples that share common expressions will be distributed when a query is executed
+ * in parallel on many machines.  Distribution can be used to refer to two distinct physical
+ * properties:
+ *  - Inter-node partitioning of data: In this case the distribution describes how tuples are
+ *    partitioned across physical machines in a cluster.  Knowing this property allows some
+ *    operators (e.g., Aggregate) to perform partition local operations instead of global ones.
+ *  - Intra-partition ordering of data: In this case the distribution describes guarantees made
+ *    about how tuples are distributed within a single partition.
+ */
+sealed trait Distribution
+
+/**
+ * Represents a distribution where no promises are made about co-location of data.
+ */
+case object UnspecifiedDistribution extends Distribution
+
+/**
+ * Represents a distribution that only has a single partition and all tuples of the dataset
+ * are co-located.
+ */
+case object AllTuples extends Distribution
+
+/**
+ * Represents data where tuples that share the same values for the `clustering`
+ * [[catalyst.expressions.Expression Expressions]] will be co-located. Based on the context, this
+ * can mean such tuples are either co-located in the same partition or they will be contiguous
+ * within a single partition.
+ */
+case class ClusteredDistribution(clustering: Seq[Expression]) extends Distribution {
+  require(
+    clustering != Nil,
+    "The clustering expressions of a ClusteredDistribution should not be Nil. " +
+      "An AllTuples should be used to represent a distribution that only has " +
+      "a single partition.")
+}
+
+/**
+ * Represents data where tuples have been ordered according to the `ordering`
+ * [[catalyst.expressions.Expression Expressions]].  This is a strictly stronger guarantee than
+ * [[ClusteredDistribution]] as an ordering will ensure that tuples that share the same value for
+ * the ordering expressions are contiguous and will never be split across partitions.
+ */
+case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
+  require(
+    ordering != Nil,
+    "The ordering expressions of a OrderedDistribution should not be Nil. " +
+      "An AllTuples should be used to represent a distribution that only has " +
+      "a single partition.")
+
+  def clustering = ordering.map(_.child).toSet
+}
+
+sealed trait Partitioning {
+  /** Returns the number of partitions that the data is split across */
+  val numPartitions: Int
+
+  /**
+   * Returns true iff the guarantees made by this
+   * [[catalyst.plans.physical.Partitioning Partitioning]] are sufficient to satisfy
+   * the partitioning scheme mandated by the `required`
+   * [[catalyst.plans.physical.Distribution Distribution]], i.e. the current dataset does not
+   * need to be re-partitioned for the `required` Distribution (it is possible that tuples within
+   * a partition need to be reorganized).
+   */
+  def satisfies(required: Distribution): Boolean
+
+  /**
+   * Returns true iff all distribution guarantees made by this partitioning can also be made
+   * for the `other` specified partitioning.
+   * For example, two [[catalyst.plans.physical.HashPartitioning HashPartitioning]]s are
+   * only compatible if the `numPartitions` of them is the same.
+   */
+  def compatibleWith(other: Partitioning): Boolean
+}
+
+case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
+  override def satisfies(required: Distribution): Boolean = required match {
+    case UnspecifiedDistribution => true
+    case _ => false
+  }
+
+  override def compatibleWith(other: Partitioning): Boolean = other match {
+    case UnknownPartitioning(_) => true
+    case _ => false
+  }
+}
+
+case object SinglePartition extends Partitioning {
+  val numPartitions = 1
+
+  override def satisfies(required: Distribution): Boolean = true
+
+  override def compatibleWith(other: Partitioning) = other match {
+    case SinglePartition => true
+    case _ => false
+  }
+}
+
+case object BroadcastPartitioning extends Partitioning {
+  val numPartitions = 1
+
+  override def satisfies(required: Distribution): Boolean = true
+
+  override def compatibleWith(other: Partitioning) = other match {
+    case SinglePartition => true
+    case _ => false
+  }
+}
+
+/**
+ * Represents a partitioning where rows are split up across partitions based on the hash
+ * of `expressions`.  All rows where `expressions` evaluate to the same values are guaranteed to be
+ * in the same partition.
+ */
+case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
+  extends Expression
+  with Partitioning {
+
+  def children = expressions
+  def references = expressions.flatMap(_.references).toSet
+  def nullable = false
+  def dataType = IntegerType
+
+  lazy val clusteringSet = expressions.toSet
+
+  override def satisfies(required: Distribution): Boolean = required match {
+    case UnspecifiedDistribution => true
+    case ClusteredDistribution(requiredClustering) =>
+      clusteringSet.subsetOf(requiredClustering.toSet)
+    case _ => false
+  }
+
+  override def compatibleWith(other: Partitioning) = other match {
+    case BroadcastPartitioning => true
+    case h: HashPartitioning if h == this => true
+    case _ => false
+  }
+}
+
+/**
+ * Represents a partitioning where rows are split across partitions based on some total ordering of
+ * the expressions specified in `ordering`.  When data is partitioned in this manner the following
+ * two conditions are guaranteed to hold:
+ *  - All row where the expressions in `ordering` evaluate to the same values will be in the same
+ *    partition.
+ *  - Each partition will have a `min` and `max` row, relative to the given ordering.  All rows
+ *    that are in between `min` and `max` in this `ordering` will reside in this partition.
+ */
+case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
+  extends Expression
+  with Partitioning {
+
+  def children = ordering
+  def references = ordering.flatMap(_.references).toSet
+  def nullable = false
+  def dataType = IntegerType
+
+  lazy val clusteringSet = ordering.map(_.child).toSet
+
+  override def satisfies(required: Distribution): Boolean = required match {
+    case UnspecifiedDistribution => true
+    case OrderedDistribution(requiredOrdering) =>
+      val minSize = Seq(requiredOrdering.size, ordering.size).min
+      requiredOrdering.take(minSize) == ordering.take(minSize)
+    case ClusteredDistribution(requiredClustering) =>
+      clusteringSet.subsetOf(requiredClustering.toSet)
+    case _ => false
+  }
+
+  override def compatibleWith(other: Partitioning) = other match {
+    case BroadcastPartitioning => true
+    case r: RangePartitioning if r == this => true
+    case _ => false
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala
new file mode 100644
index 0000000..6ff4891
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package rules
+
+import trees._
+
+abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
+
+  /** Name for this rule, automatically inferred based on class name. */
+  val ruleName: String = {
+    val className = getClass.getName
+    if (className endsWith "$") className.dropRight(1) else className
+  }
+
+  def apply(plan: TreeType): TreeType
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
new file mode 100644
index 0000000..68ae30c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package rules
+
+import trees._
+import util._
+
+abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
+
+  /**
+   * An execution strategy for rules that indicates the maximum number of executions. If the
+   * execution reaches fix point (i.e. converge) before maxIterations, it will stop.
+   */
+  abstract class Strategy { def maxIterations: Int }
+
+  /** A strategy that only runs once. */
+  case object Once extends Strategy { val maxIterations = 1 }
+
+  /** A strategy that runs until fix point or maxIterations times, whichever comes first. */
+  case class FixedPoint(maxIterations: Int) extends Strategy
+
+  /** A batch of rules. */
+  protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
+
+  /** Defines a sequence of rule batches, to be overridden by the implementation. */
+  protected val batches: Seq[Batch]
+
+  /**
+   * Executes the batches of rules defined by the subclass. The batches are executed serially
+   * using the defined execution strategy. Within each batch, rules are also executed serially.
+   */
+  def apply(plan: TreeType): TreeType = {
+    var curPlan = plan
+
+    batches.foreach { batch =>
+      var iteration = 1
+      var lastPlan = curPlan
+      curPlan = batch.rules.foldLeft(curPlan) { case (curPlan, rule) => rule(curPlan) }
+
+      // Run until fix point (or the max number of iterations as specified in the strategy.
+      while (iteration < batch.strategy.maxIterations && !curPlan.fastEquals(lastPlan)) {
+        lastPlan = curPlan
+        curPlan = batch.rules.foldLeft(curPlan) {
+          case (curPlan, rule) =>
+            val result = rule(curPlan)
+            if (!result.fastEquals(curPlan)) {
+              logger.debug(
+                s"""
+                  |=== Applying Rule ${rule.ruleName} ===
+                  |${sideBySide(curPlan.treeString, result.treeString).mkString("\n")}
+                """.stripMargin)
+            }
+
+            result
+        }
+        iteration += 1
+      }
+    }
+
+    curPlan
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/package.scala
new file mode 100644
index 0000000..26ab543
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+/**
+ * A framework for applying batches rewrite rules to trees, possibly to fixed point.
+ */
+package object rules

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
new file mode 100644
index 0000000..76ede87
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package trees
+
+import errors._
+
+object TreeNode {
+  private val currentId = new java.util.concurrent.atomic.AtomicLong
+  protected def nextId() = currentId.getAndIncrement()
+}
+
+/** Used by [[TreeNode.getNodeNumbered]] when traversing the tree for a given number */
+private class MutableInt(var i: Int)
+
+abstract class TreeNode[BaseType <: TreeNode[BaseType]] {
+  self: BaseType with Product =>
+
+  /** Returns a Seq of the children of this node */
+  def children: Seq[BaseType]
+
+  /**
+   * A globally unique id for this specific instance. Not preserved across copies.
+   * Unlike `equals`, `id` can be used to differentiate distinct but structurally
+   * identical branches of a tree.
+   */
+  val id = TreeNode.nextId()
+
+  /**
+   * Returns true if other is the same [[catalyst.trees.TreeNode TreeNode]] instance.  Unlike
+   * `equals` this function will return false for different instances of structurally identical
+   * trees.
+   */
+  def sameInstance(other: TreeNode[_]): Boolean = {
+    this.id == other.id
+  }
+
+  /**
+   * Faster version of equality which short-circuits when two treeNodes are the same instance.
+   * We don't just override Object.Equals, as doing so prevents the scala compiler from from
+   * generating case class `equals` methods
+   */
+  def fastEquals(other: TreeNode[_]): Boolean = {
+    sameInstance(other) || this == other
+  }
+
+  /**
+   * Runs the given function on this node and then recursively on [[children]].
+   * @param f the function to be applied to each node in the tree.
+   */
+  def foreach(f: BaseType => Unit): Unit = {
+    f(this)
+    children.foreach(_.foreach(f))
+  }
+
+  /**
+   * Returns a Seq containing the result of applying the given function to each
+   * node in this tree in a preorder traversal.
+   * @param f the function to be applied.
+   */
+  def map[A](f: BaseType => A): Seq[A] = {
+    val ret = new collection.mutable.ArrayBuffer[A]()
+    foreach(ret += f(_))
+    ret
+  }
+
+  /**
+   * Returns a Seq by applying a function to all nodes in this tree and using the elements of the
+   * resulting collections.
+   */
+  def flatMap[A](f: BaseType => TraversableOnce[A]): Seq[A] = {
+    val ret = new collection.mutable.ArrayBuffer[A]()
+    foreach(ret ++= f(_))
+    ret
+  }
+
+  /**
+   * Returns a Seq containing the result of applying a partial function to all elements in this
+   * tree on which the function is defined.
+   */
+  def collect[B](pf: PartialFunction[BaseType, B]): Seq[B] = {
+    val ret = new collection.mutable.ArrayBuffer[B]()
+    val lifted = pf.lift
+    foreach(node => lifted(node).foreach(ret.+=))
+    ret
+  }
+
+  /**
+   * Returns a copy of this node where `f` has been applied to all the nodes children.
+   */
+  def mapChildren(f: BaseType => BaseType): this.type = {
+    var changed = false
+    val newArgs = productIterator.map {
+      case arg: TreeNode[_] if children contains arg =>
+        val newChild = f(arg.asInstanceOf[BaseType])
+        if (newChild fastEquals arg) {
+          arg
+        } else {
+          changed = true
+          newChild
+        }
+      case nonChild: AnyRef => nonChild
+      case null => null
+    }.toArray
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  /**
+   * Returns a copy of this node with the children replaced.
+   * TODO: Validate somewhere (in debug mode?) that children are ordered correctly.
+   */
+  def withNewChildren(newChildren: Seq[BaseType]): this.type = {
+    assert(newChildren.size == children.size, "Incorrect number of children")
+    var changed = false
+    val remainingNewChildren = newChildren.toBuffer
+    val remainingOldChildren = children.toBuffer
+    val newArgs = productIterator.map {
+      case arg: TreeNode[_] if children contains arg =>
+        val newChild = remainingNewChildren.remove(0)
+        val oldChild = remainingOldChildren.remove(0)
+        if (newChild fastEquals oldChild) {
+          oldChild
+        } else {
+          changed = true
+          newChild
+        }
+      case nonChild: AnyRef => nonChild
+      case null => null
+    }.toArray
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  /**
+   * Returns a copy of this node where `rule` has been recursively applied to the tree.
+   * When `rule` does not apply to a given node it is left unchanged.
+   * Users should not expect a specific directionality. If a specific directionality is needed,
+   * transformDown or transformUp should be used.
+   * @param rule the function use to transform this nodes children
+   */
+  def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = {
+    transformDown(rule)
+  }
+
+  /**
+   * Returns a copy of this node where `rule` has been recursively applied to it and all of its
+   * children (pre-order). When `rule` does not apply to a given node it is left unchanged.
+   * @param rule the function used to transform this nodes children
+   */
+  def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
+    val afterRule = rule.applyOrElse(this, identity[BaseType])
+    // Check if unchanged and then possibly return old copy to avoid gc churn.
+    if (this fastEquals afterRule) {
+      transformChildrenDown(rule)
+    } else {
+      afterRule.transformChildrenDown(rule)
+    }
+  }
+
+  /**
+   * Returns a copy of this node where `rule` has been recursively applied to all the children of
+   * this node.  When `rule` does not apply to a given node it is left unchanged.
+   * @param rule the function used to transform this nodes children
+   */
+  def transformChildrenDown(rule: PartialFunction[BaseType, BaseType]): this.type = {
+    var changed = false
+    val newArgs = productIterator.map {
+      case arg: TreeNode[_] if children contains arg =>
+        val newChild = arg.asInstanceOf[BaseType].transformDown(rule)
+        if (!(newChild fastEquals arg)) {
+          changed = true
+          newChild
+        } else {
+          arg
+        }
+      case m: Map[_,_] => m
+      case args: Traversable[_] => args.map {
+        case arg: TreeNode[_] if children contains arg =>
+          val newChild = arg.asInstanceOf[BaseType].transformDown(rule)
+          if (!(newChild fastEquals arg)) {
+            changed = true
+            newChild
+          } else {
+            arg
+          }
+        case other => other
+      }
+      case nonChild: AnyRef => nonChild
+      case null => null
+    }.toArray
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  /**
+   * Returns a copy of this node where `rule` has been recursively applied first to all of its
+   * children and then itself (post-order). When `rule` does not apply to a given node, it is left
+   * unchanged.
+   * @param rule the function use to transform this nodes children
+   */
+  def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {
+    val afterRuleOnChildren = transformChildrenUp(rule);
+    if (this fastEquals afterRuleOnChildren) {
+      rule.applyOrElse(this, identity[BaseType])
+    } else {
+      rule.applyOrElse(afterRuleOnChildren, identity[BaseType])
+    }
+  }
+
+  def transformChildrenUp(rule: PartialFunction[BaseType, BaseType]): this.type = {
+    var changed = false
+    val newArgs = productIterator.map {
+      case arg: TreeNode[_] if children contains arg =>
+        val newChild = arg.asInstanceOf[BaseType].transformUp(rule)
+        if (!(newChild fastEquals arg)) {
+          changed = true
+          newChild
+        } else {
+          arg
+        }
+      case m: Map[_,_] => m
+      case args: Traversable[_] => args.map {
+        case arg: TreeNode[_] if children contains arg =>
+          val newChild = arg.asInstanceOf[BaseType].transformUp(rule)
+          if (!(newChild fastEquals arg)) {
+            changed = true
+            newChild
+          } else {
+            arg
+          }
+        case other => other
+      }
+      case nonChild: AnyRef => nonChild
+      case null => null
+    }.toArray
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  /**
+   * Args to the constructor that should be copied, but not transformed.
+   * These are appended to the transformed args automatically by makeCopy
+   * @return
+   */
+  protected def otherCopyArgs: Seq[AnyRef] = Nil
+
+  /**
+   * Creates a copy of this type of tree node after a transformation.
+   * Must be overridden by child classes that have constructor arguments
+   * that are not present in the productIterator.
+   * @param newArgs the new product arguments.
+   */
+  def makeCopy(newArgs: Array[AnyRef]): this.type = attachTree(this, "makeCopy") {
+    try {
+      val defaultCtor = getClass.getConstructors.head
+      if (otherCopyArgs.isEmpty) {
+        defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type]
+      } else {
+        defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[this.type]
+      }
+    } catch {
+      case e: java.lang.IllegalArgumentException =>
+        throw new TreeNodeException(
+          this, s"Failed to copy node.  Is otherCopyArgs specified correctly for $nodeName?")
+    }
+  }
+
+  /** Returns the name of this type of TreeNode.  Defaults to the class name. */
+  def nodeName = getClass.getSimpleName
+
+  /**
+   * The arguments that should be included in the arg string.  Defaults to the `productIterator`.
+   */
+  protected def stringArgs = productIterator
+
+  /** Returns a string representing the arguments to this node, minus any children */
+  def argString: String = productIterator.flatMap {
+    case tn: TreeNode[_] if children contains tn => Nil
+    case tn: TreeNode[_] if tn.toString contains "\n" => s"(${tn.simpleString})" :: Nil
+    case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil
+    case set: Set[_] => set.mkString("{", ",", "}") :: Nil
+    case other => other :: Nil
+  }.mkString(", ")
+
+  /** String representation of this node without any children */
+  def simpleString = s"$nodeName $argString"
+
+  override def toString: String = treeString
+
+  /** Returns a string representation of the nodes in this tree */
+  def treeString = generateTreeString(0, new StringBuilder).toString
+
+  /**
+   * Returns a string representation of the nodes in this tree, where each operator is numbered.
+   * The numbers can be used with [[trees.TreeNode.apply apply]] to easily access specific subtrees.
+   */
+  def numberedTreeString =
+    treeString.split("\n").zipWithIndex.map { case (line, i) => f"$i%02d $line" }.mkString("\n")
+
+  /**
+   * Returns the tree node at the specified number.
+   * Numbers for each node can be found in the [[numberedTreeString]].
+   */
+  def apply(number: Int): BaseType = getNodeNumbered(new MutableInt(number))
+
+  protected def getNodeNumbered(number: MutableInt): BaseType = {
+    if (number.i < 0) {
+      null.asInstanceOf[BaseType]
+    } else if (number.i == 0) {
+      this
+    } else {
+      number.i -= 1
+      children.map(_.getNodeNumbered(number)).find(_ != null).getOrElse(null.asInstanceOf[BaseType])
+    }
+  }
+
+  /** Appends the string represent of this node and its children to the given StringBuilder. */
+  protected def generateTreeString(depth: Int, builder: StringBuilder): StringBuilder = {
+    builder.append(" " * depth)
+    builder.append(simpleString)
+    builder.append("\n")
+    children.foreach(_.generateTreeString(depth + 1, builder))
+    builder
+  }
+}
+
+/**
+ * A [[TreeNode]] that has two children, [[left]] and [[right]].
+ */
+trait BinaryNode[BaseType <: TreeNode[BaseType]] {
+  def left: BaseType
+  def right: BaseType
+
+  def children = Seq(left, right)
+}
+
+/**
+ * A [[TreeNode]] with no children.
+ */
+trait LeafNode[BaseType <: TreeNode[BaseType]] {
+  def children = Nil
+}
+
+/**
+ * A [[TreeNode]] with a single [[child]].
+ */
+trait UnaryNode[BaseType <: TreeNode[BaseType]] {
+  def child: BaseType
+  def children = child :: Nil
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala
new file mode 100644
index 0000000..e2da1d2
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+/**
+ * A library for easily manipulating trees of operators.  Operators that extend TreeNode are
+ * granted the following interface:
+ * <ul>
+ *   <li>Scala collection like methods (foreach, map, flatMap, collect, etc)</li>
+ *   <li>
+ *     transform - accepts a partial function that is used to generate a new tree.  When the
+ *     partial function can be applied to a given tree segment, that segment is replaced with the
+ *     result.  After attempting to apply the partial function to a given node, the transform
+ *     function recursively attempts to apply the function to that node's children.
+ *   </li>
+ *   <li>debugging support - pretty printing, easy splicing of trees, etc.</li>
+ * </ul>
+ */
+package object trees {
+  // Since we want tree nodes to be lightweight, we create one logger for all treenode instances.
+  protected val logger = Logger("catalyst.trees")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
new file mode 100644
index 0000000..6eb2b62
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package types
+
+import expressions.Expression
+
+abstract class DataType {
+  /** Matches any expression that evaluates to this DataType */
+  def unapply(a: Expression): Boolean = a match {
+    case e: Expression if e.dataType == this => true
+    case _ => false
+  }
+}
+
+case object NullType extends DataType
+
+abstract class NativeType extends DataType {
+  type JvmType
+  val ordering: Ordering[JvmType]
+}
+
+case object StringType extends NativeType {
+  type JvmType = String
+  val ordering = implicitly[Ordering[JvmType]]
+}
+case object BinaryType extends DataType {
+  type JvmType = Array[Byte]
+}
+case object BooleanType extends NativeType {
+  type JvmType = Boolean
+  val ordering = implicitly[Ordering[JvmType]]
+}
+
+abstract class NumericType extends NativeType {
+  // Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
+  // implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
+  // type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
+  // desugared by the compiler into an argument to the objects constructor. This means there is no
+  // longer an no argument constructor and thus the JVM cannot serialize the object anymore.
+  val numeric: Numeric[JvmType]
+}
+
+/** Matcher for any expressions that evaluate to [[IntegralType]]s */
+object IntegralType {
+  def unapply(a: Expression): Boolean = a match {
+    case e: Expression if e.dataType.isInstanceOf[IntegralType] => true
+    case _ => false
+  }
+}
+
+abstract class IntegralType extends NumericType {
+  val integral: Integral[JvmType]
+}
+
+case object LongType extends IntegralType {
+  type JvmType = Long
+  val numeric = implicitly[Numeric[Long]]
+  val integral = implicitly[Integral[Long]]
+  val ordering = implicitly[Ordering[JvmType]]
+}
+
+case object IntegerType extends IntegralType {
+  type JvmType = Int
+  val numeric = implicitly[Numeric[Int]]
+  val integral = implicitly[Integral[Int]]
+  val ordering = implicitly[Ordering[JvmType]]
+}
+
+case object ShortType extends IntegralType {
+  type JvmType = Short
+  val numeric = implicitly[Numeric[Short]]
+  val integral = implicitly[Integral[Short]]
+  val ordering = implicitly[Ordering[JvmType]]
+}
+
+case object ByteType extends IntegralType {
+  type JvmType = Byte
+  val numeric = implicitly[Numeric[Byte]]
+  val integral = implicitly[Integral[Byte]]
+  val ordering = implicitly[Ordering[JvmType]]
+}
+
+/** Matcher for any expressions that evaluate to [[FractionalType]]s */
+object FractionalType {
+  def unapply(a: Expression): Boolean = a match {
+    case e: Expression if e.dataType.isInstanceOf[FractionalType] => true
+    case _ => false
+  }
+}
+abstract class FractionalType extends NumericType {
+  val fractional: Fractional[JvmType]
+}
+
+case object DecimalType extends FractionalType {
+  type JvmType = BigDecimal
+  val numeric = implicitly[Numeric[BigDecimal]]
+  val fractional = implicitly[Fractional[BigDecimal]]
+  val ordering = implicitly[Ordering[JvmType]]
+}
+
+case object DoubleType extends FractionalType {
+  type JvmType = Double
+  val numeric = implicitly[Numeric[Double]]
+  val fractional = implicitly[Fractional[Double]]
+  val ordering = implicitly[Ordering[JvmType]]
+}
+
+case object FloatType extends FractionalType {
+  type JvmType = Float
+  val numeric = implicitly[Numeric[Float]]
+  val fractional = implicitly[Fractional[Float]]
+  val ordering = implicitly[Ordering[JvmType]]
+}
+
+case class ArrayType(elementType: DataType) extends DataType
+
+case class StructField(name: String, dataType: DataType, nullable: Boolean)
+case class StructType(fields: Seq[StructField]) extends DataType
+
+case class MapType(keyType: DataType, valueType: DataType) extends DataType

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/package.scala
new file mode 100644
index 0000000..b65a561
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+/**
+ * Contains a type system for attributes produced by relations, including complex types like
+ * structs, arrays and maps.
+ */
+package object types

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
new file mode 100644
index 0000000..52adea2
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+import java.io.{PrintWriter, ByteArrayOutputStream, FileInputStream, File}
+
+package object util {
+  /**
+   * Returns a path to a temporary file that probably does not exist.
+   * Note, there is always the race condition that someone created this
+   * file since the last time we checked.  Thus, this shouldn't be used
+   * for anything security conscious.
+   */
+  def getTempFilePath(prefix: String, suffix: String = ""): File = {
+    val tempFile = File.createTempFile(prefix, suffix)
+    tempFile.delete()
+    tempFile
+  }
+
+  def fileToString(file: File, encoding: String = "UTF-8") = {
+    val inStream = new FileInputStream(file)
+    val outStream = new ByteArrayOutputStream
+    try {
+      var reading = true
+      while ( reading ) {
+        inStream.read() match {
+          case -1 => reading = false
+          case c => outStream.write(c)
+        }
+      }
+      outStream.flush()
+    }
+    finally {
+      inStream.close()
+    }
+    new String(outStream.toByteArray, encoding)
+  }
+
+  def resourceToString(
+      resource:String,
+      encoding: String = "UTF-8",
+      classLoader: ClassLoader = this.getClass.getClassLoader) = {
+    val inStream = classLoader.getResourceAsStream(resource)
+    val outStream = new ByteArrayOutputStream
+    try {
+      var reading = true
+      while ( reading ) {
+        inStream.read() match {
+          case -1 => reading = false
+          case c => outStream.write(c)
+        }
+      }
+      outStream.flush()
+    }
+    finally {
+      inStream.close()
+    }
+    new String(outStream.toByteArray, encoding)
+  }
+
+  def stringToFile(file: File, str: String): File = {
+    val out = new PrintWriter(file)
+    out.write(str)
+    out.close()
+    file
+  }
+
+  def sideBySide(left: String, right: String): Seq[String] = {
+    sideBySide(left.split("\n"), right.split("\n"))
+  }
+
+  def sideBySide(left: Seq[String], right: Seq[String]): Seq[String] = {
+    val maxLeftSize = left.map(_.size).max
+    val leftPadded = left ++ Seq.fill(math.max(right.size - left.size, 0))("")
+    val rightPadded = right ++ Seq.fill(math.max(left.size - right.size, 0))("")
+
+    leftPadded.zip(rightPadded).map {
+      case (l, r) => (if (l == r) " " else "!") + l + (" " * ((maxLeftSize - l.size) + 3)) + r
+    }
+  }
+
+  def stackTraceToString(t: Throwable): String = {
+    val out = new java.io.ByteArrayOutputStream
+    val writer = new PrintWriter(out)
+    t.printStackTrace(writer)
+    writer.flush()
+    new String(out.toByteArray)
+  }
+
+  def stringOrNull(a: AnyRef) = if (a == null) null else a.toString
+
+  def benchmark[A](f: => A): A = {
+    val startTime = System.nanoTime()
+    val ret = f
+    val endTime = System.nanoTime()
+    println(s"${(endTime - startTime).toDouble / 1000000}ms")
+    ret
+  }
+
+  /* FIX ME
+  implicit class debugLogging(a: AnyRef) {
+    def debugLogging() {
+      org.apache.log4j.Logger.getLogger(a.getClass.getName).setLevel(org.apache.log4j.Level.DEBUG)
+    }
+  } */
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
new file mode 100644
index 0000000..9ec3168
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * Allows the execution of relational queries, including those expressed in SQL using Spark.
+ *
+ * Note that this package is located in catalyst instead of in core so that all subprojects can
+ * inherit the settings from this package object.
+ */
+package object sql {
+
+  protected[sql] def Logger(name: String) =
+    com.typesafe.scalalogging.slf4j.Logger(org.slf4j.LoggerFactory.getLogger(name))
+
+  protected[sql] type Logging = com.typesafe.scalalogging.slf4j.Logging
+
+  type Row = catalyst.expressions.Row
+
+  object Row {
+    /**
+     * This method can be used to extract fields from a [[Row]] object in a pattern match. Example:
+     * {{{
+     * import org.apache.spark.sql._
+     *
+     * val pairs = sql("SELECT key, value FROM src").rdd.map {
+     *   case Row(key: Int, value: String) =>
+     *     key -> value
+     * }
+     * }}}
+     */
+    def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)
+  }
+}