You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/03/21 02:04:49 UTC
[6/9] SPARK-1251 Support for optimizing and executing structured
queries
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
new file mode 100644
index 0000000..4db2803
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package optimizer
+
+import catalyst.expressions._
+import catalyst.plans.logical._
+import catalyst.rules._
+import catalyst.types.BooleanType
+import catalyst.plans.Inner
+
+object Optimizer extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Subqueries", Once,
+ EliminateSubqueries) ::
+ Batch("ConstantFolding", Once,
+ ConstantFolding,
+ BooleanSimplification,
+ SimplifyCasts) ::
+ Batch("Filter Pushdown", Once,
+ EliminateSubqueries,
+ CombineFilters,
+ PushPredicateThroughProject,
+ PushPredicateThroughInnerJoin) :: Nil
+}
+
+/**
+ * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan. Subqueries are
+ * only required to provide scoping information for attributes and can be removed once analysis is
+ * complete.
+ */
+object EliminateSubqueries extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case Subquery(_, child) => child
+ }
+}
+
+/**
+ * Replaces [[catalyst.expressions.Expression Expressions]] that can be statically evaluated with
+ * equivalent [[catalyst.expressions.Literal Literal]] values.
+ */
+object ConstantFolding extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case q: LogicalPlan => q transformExpressionsDown {
+ // Skip redundant folding of literals.
+ case l: Literal => l
+ case e if e.foldable => Literal(e.apply(null), e.dataType)
+ }
+ }
+}
+
+/**
+ * Simplifies boolean expressions where the answer can be determined without evaluating both sides.
+ * Note that this rule can eliminate expressions that might otherwise have been evaluated and thus
+ * is only safe when evaluations of expressions does not result in side effects.
+ */
+object BooleanSimplification extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case q: LogicalPlan => q transformExpressionsUp {
+ case and @ And(left, right) => {
+ (left, right) match {
+ case (Literal(true, BooleanType), r) => r
+ case (l, Literal(true, BooleanType)) => l
+ case (Literal(false, BooleanType), _) => Literal(false)
+ case (_, Literal(false, BooleanType)) => Literal(false)
+ case (_, _) => and
+ }
+ }
+ case or @ Or(left, right) => {
+ (left, right) match {
+ case (Literal(true, BooleanType), _) => Literal(true)
+ case (_, Literal(true, BooleanType)) => Literal(true)
+ case (Literal(false, BooleanType), r) => r
+ case (l, Literal(false, BooleanType)) => l
+ case (_, _) => or
+ }
+ }
+ }
+ }
+}
+
+/**
+ * Combines two adjacent [[catalyst.plans.logical.Filter Filter]] operators into one, merging the
+ * conditions into one conjunctive predicate.
+ */
+object CombineFilters extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case ff@Filter(fc, nf@Filter(nc, grandChild)) => Filter(And(nc, fc), grandChild)
+ }
+}
+
+/**
+ * Pushes [[catalyst.plans.logical.Filter Filter]] operators through
+ * [[catalyst.plans.logical.Project Project]] operators, in-lining any
+ * [[catalyst.expressions.Alias Aliases]] that were defined in the projection.
+ *
+ * This heuristic is valid assuming the expression evaluation cost is minimal.
+ */
+object PushPredicateThroughProject extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case filter@Filter(condition, project@Project(fields, grandChild)) =>
+ val sourceAliases = fields.collect { case a@Alias(c, _) => a.toAttribute -> c }.toMap
+ project.copy(child = filter.copy(
+ replaceAlias(condition, sourceAliases),
+ grandChild))
+ }
+
+ //
+ def replaceAlias(condition: Expression, sourceAliases: Map[Attribute, Expression]): Expression = {
+ condition transform {
+ case a: AttributeReference => sourceAliases.getOrElse(a, a)
+ }
+ }
+}
+
+/**
+ * Pushes down [[catalyst.plans.logical.Filter Filter]] operators where the `condition` can be
+ * evaluated using only the attributes of the left or right side of an inner join. Other
+ * [[catalyst.plans.logical.Filter Filter]] conditions are moved into the `condition` of the
+ * [[catalyst.plans.logical.Join Join]].
+ */
+object PushPredicateThroughInnerJoin extends Rule[LogicalPlan] with PredicateHelper {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case f @ Filter(filterCondition, Join(left, right, Inner, joinCondition)) =>
+ val allConditions =
+ splitConjunctivePredicates(filterCondition) ++
+ joinCondition.map(splitConjunctivePredicates).getOrElse(Nil)
+
+ // Split the predicates into those that can be evaluated on the left, right, and those that
+ // must be evaluated after the join.
+ val (rightConditions, leftOrJoinConditions) =
+ allConditions.partition(_.references subsetOf right.outputSet)
+ val (leftConditions, joinConditions) =
+ leftOrJoinConditions.partition(_.references subsetOf left.outputSet)
+
+ // Build the new left and right side, optionally with the pushed down filters.
+ val newLeft = leftConditions.reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
+ val newRight = rightConditions.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
+ Join(newLeft, newRight, Inner, joinConditions.reduceLeftOption(And))
+ }
+}
+
+/**
+ * Removes [[catalyst.expressions.Cast Casts]] that are unnecessary because the input is already
+ * the correct type.
+ */
+object SimplifyCasts extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+ case Cast(e, dataType) if e.dataType == dataType => e
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
new file mode 100644
index 0000000..22f8ea0
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package planning
+
+
+import plans.logical.LogicalPlan
+import trees._
+
+/**
+ * Abstract class for transforming [[plans.logical.LogicalPlan LogicalPlan]]s into physical plans.
+ * Child classes are responsible for specifying a list of [[Strategy]] objects that each of which
+ * can return a list of possible physical plan options. If a given strategy is unable to plan all
+ * of the remaining operators in the tree, it can call [[planLater]], which returns a placeholder
+ * object that will be filled in using other available strategies.
+ *
+ * TODO: RIGHT NOW ONLY ONE PLAN IS RETURNED EVER...
+ * PLAN SPACE EXPLORATION WILL BE IMPLEMENTED LATER.
+ *
+ * @tparam PhysicalPlan The type of physical plan produced by this [[QueryPlanner]]
+ */
+abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
+ /** A list of execution strategies that can be used by the planner */
+ def strategies: Seq[Strategy]
+
+ /**
+ * Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can
+ * be used for execution. If this strategy does not apply to the give logical operation then an
+ * empty list should be returned.
+ */
+ abstract protected class Strategy extends Logging {
+ def apply(plan: LogicalPlan): Seq[PhysicalPlan]
+ }
+
+ /**
+ * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be
+ * filled in automatically by the QueryPlanner using the other execution strategies that are
+ * available.
+ */
+ protected def planLater(plan: LogicalPlan) = apply(plan).next()
+
+ def apply(plan: LogicalPlan): Iterator[PhysicalPlan] = {
+ // Obviously a lot to do here still...
+ val iter = strategies.view.flatMap(_(plan)).toIterator
+ assert(iter.hasNext, s"No plan for $plan")
+ iter
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/package.scala
new file mode 100644
index 0000000..64370ec
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+/**
+ * Contains classes for enumerating possible physical plans for a given logical query plan.
+ */
+package object planning
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
new file mode 100644
index 0000000..613b028
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package planning
+
+import scala.annotation.tailrec
+
+import expressions._
+import plans.logical._
+
+/**
+ * A pattern that matches any number of filter operations on top of another relational operator.
+ * Adjacent filter operators are collected and their conditions are broken up and returned as a
+ * sequence of conjunctive predicates.
+ *
+ * @return A tuple containing a sequence of conjunctive predicates that should be used to filter the
+ * output and a relational operator.
+ */
+object FilteredOperation extends PredicateHelper {
+ type ReturnType = (Seq[Expression], LogicalPlan)
+
+ def unapply(plan: LogicalPlan): Option[ReturnType] = Some(collectFilters(Nil, plan))
+
+ @tailrec
+ private def collectFilters(filters: Seq[Expression], plan: LogicalPlan): ReturnType = plan match {
+ case Filter(condition, child) =>
+ collectFilters(filters ++ splitConjunctivePredicates(condition), child)
+ case other => (filters, other)
+ }
+}
+
+/**
+ * A pattern that matches any number of project or filter operations on top of another relational
+ * operator. All filter operators are collected and their conditions are broken up and returned
+ * together with the top project operator. [[Alias Aliases]] are in-lined/substituted if necessary.
+ */
+object PhysicalOperation extends PredicateHelper {
+ type ReturnType = (Seq[NamedExpression], Seq[Expression], LogicalPlan)
+
+ def unapply(plan: LogicalPlan): Option[ReturnType] = {
+ val (fields, filters, child, _) = collectProjectsAndFilters(plan)
+ Some((fields.getOrElse(child.output), filters, child))
+ }
+
+ /**
+ * Collects projects and filters, in-lining/substituting aliases if necessary. Here are two
+ * examples for alias in-lining/substitution. Before:
+ * {{{
+ * SELECT c1 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10
+ * SELECT c1 AS c2 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10
+ * }}}
+ * After:
+ * {{{
+ * SELECT key AS c1 FROM t1 WHERE key > 10
+ * SELECT key AS c2 FROM t1 WHERE key > 10
+ * }}}
+ */
+ def collectProjectsAndFilters(plan: LogicalPlan):
+ (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression]) =
+ plan match {
+ case Project(fields, child) =>
+ val (_, filters, other, aliases) = collectProjectsAndFilters(child)
+ val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
+ (Some(substitutedFields), filters, other, collectAliases(substitutedFields))
+
+ case Filter(condition, child) =>
+ val (fields, filters, other, aliases) = collectProjectsAndFilters(child)
+ val substitutedCondition = substitute(aliases)(condition)
+ (fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)
+
+ case other =>
+ (None, Nil, other, Map.empty)
+ }
+
+ def collectAliases(fields: Seq[Expression]) = fields.collect {
+ case a @ Alias(child, _) => a.toAttribute.asInstanceOf[Attribute] -> child
+ }.toMap
+
+ def substitute(aliases: Map[Attribute, Expression])(expr: Expression) = expr.transform {
+ case a @ Alias(ref: AttributeReference, name) =>
+ aliases.get(ref).map(Alias(_, name)(a.exprId, a.qualifiers)).getOrElse(a)
+
+ case a: AttributeReference =>
+ aliases.get(a).map(Alias(_, a.name)(a.exprId, a.qualifiers)).getOrElse(a)
+ }
+}
+
+/**
+ * A pattern that collects all adjacent unions and returns their children as a Seq.
+ */
+object Unions {
+ def unapply(plan: LogicalPlan): Option[Seq[LogicalPlan]] = plan match {
+ case u: Union => Some(collectUnionChildren(u))
+ case _ => None
+ }
+
+ private def collectUnionChildren(plan: LogicalPlan): Seq[LogicalPlan] = plan match {
+ case Union(l, r) => collectUnionChildren(l) ++ collectUnionChildren(r)
+ case other => other :: Nil
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
new file mode 100644
index 0000000..20f230c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+
+import catalyst.expressions.{SortOrder, Attribute, Expression}
+import catalyst.trees._
+
+abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
+ self: PlanType with Product =>
+
+ def output: Seq[Attribute]
+
+ /**
+ * Returns the set of attributes that are output by this node.
+ */
+ def outputSet: Set[Attribute] = output.toSet
+
+ /**
+ * Runs [[transform]] with `rule` on all expressions present in this query operator.
+ * Users should not expect a specific directionality. If a specific directionality is needed,
+ * transformExpressionsDown or transformExpressionsUp should be used.
+ * @param rule the rule to be applied to every expression in this operator.
+ */
+ def transformExpressions(rule: PartialFunction[Expression, Expression]): this.type = {
+ transformExpressionsDown(rule)
+ }
+
+ /**
+ * Runs [[transformDown]] with `rule` on all expressions present in this query operator.
+ * @param rule the rule to be applied to every expression in this operator.
+ */
+ def transformExpressionsDown(rule: PartialFunction[Expression, Expression]): this.type = {
+ var changed = false
+
+ @inline def transformExpressionDown(e: Expression) = {
+ val newE = e.transformDown(rule)
+ if (newE.id != e.id && newE != e) {
+ changed = true
+ newE
+ } else {
+ e
+ }
+ }
+
+ val newArgs = productIterator.map {
+ case e: Expression => transformExpressionDown(e)
+ case Some(e: Expression) => Some(transformExpressionDown(e))
+ case m: Map[_,_] => m
+ case seq: Traversable[_] => seq.map {
+ case e: Expression => transformExpressionDown(e)
+ case other => other
+ }
+ case other: AnyRef => other
+ }.toArray
+
+ if (changed) makeCopy(newArgs) else this
+ }
+
+ /**
+ * Runs [[transformUp]] with `rule` on all expressions present in this query operator.
+ * @param rule the rule to be applied to every expression in this operator.
+ * @return
+ */
+ def transformExpressionsUp(rule: PartialFunction[Expression, Expression]): this.type = {
+ var changed = false
+
+ @inline def transformExpressionUp(e: Expression) = {
+ val newE = e.transformUp(rule)
+ if (newE.id != e.id && newE != e) {
+ changed = true
+ newE
+ } else {
+ e
+ }
+ }
+
+ val newArgs = productIterator.map {
+ case e: Expression => transformExpressionUp(e)
+ case Some(e: Expression) => Some(transformExpressionUp(e))
+ case m: Map[_,_] => m
+ case seq: Traversable[_] => seq.map {
+ case e: Expression => transformExpressionUp(e)
+ case other => other
+ }
+ case other: AnyRef => other
+ }.toArray
+
+ if (changed) makeCopy(newArgs) else this
+ }
+
+ /** Returns the result of running [[transformExpressions]] on this node
+ * and all its children. */
+ def transformAllExpressions(rule: PartialFunction[Expression, Expression]): this.type = {
+ transform {
+ case q: QueryPlan[_] => q.transformExpressions(rule).asInstanceOf[PlanType]
+ }.asInstanceOf[this.type]
+ }
+
+ /** Returns all of the expressions present in this query plan operator. */
+ def expressions: Seq[Expression] = {
+ productIterator.flatMap {
+ case e: Expression => e :: Nil
+ case Some(e: Expression) => e :: Nil
+ case seq: Traversable[_] => seq.flatMap {
+ case e: Expression => e :: Nil
+ case other => Nil
+ }
+ case other => Nil
+ }.toSeq
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
new file mode 100644
index 0000000..9f2283a
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+
+sealed abstract class JoinType
+case object Inner extends JoinType
+case object LeftOuter extends JoinType
+case object RightOuter extends JoinType
+case object FullOuter extends JoinType
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala
new file mode 100644
index 0000000..48ff45c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package logical
+
+abstract class BaseRelation extends LeafNode {
+ self: Product =>
+
+ def tableName: String
+ def isPartitioned: Boolean = false
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
new file mode 100644
index 0000000..bc7b687
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package logical
+
+import catalyst.expressions._
+import catalyst.errors._
+import catalyst.types.StructType
+
+abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
+ self: Product =>
+
+ /**
+ * Returns the set of attributes that are referenced by this node
+ * during evaluation.
+ */
+ def references: Set[Attribute]
+
+ /**
+ * Returns the set of attributes that this node takes as
+ * input from its children.
+ */
+ lazy val inputSet: Set[Attribute] = children.flatMap(_.output).toSet
+
+ /**
+ * Returns true if this expression and all its children have been resolved to a specific schema
+ * and false if it is still contains any unresolved placeholders. Implementations of LogicalPlan
+ * can override this (e.g. [[catalyst.analysis.UnresolvedRelation UnresolvedRelation]] should
+ * return `false`).
+ */
+ lazy val resolved: Boolean = !expressions.exists(!_.resolved) && childrenResolved
+
+ /**
+ * Returns true if all its children of this query plan have been resolved.
+ */
+ def childrenResolved = !children.exists(!_.resolved)
+
+ /**
+ * Optionally resolves the given string to a
+ * [[catalyst.expressions.NamedExpression NamedExpression]]. The attribute is expressed as
+ * as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
+ */
+ def resolve(name: String): Option[NamedExpression] = {
+ val parts = name.split("\\.")
+ // Collect all attributes that are output by this nodes children where either the first part
+ // matches the name or where the first part matches the scope and the second part matches the
+ // name. Return these matches along with any remaining parts, which represent dotted access to
+ // struct fields.
+ val options = children.flatMap(_.output).flatMap { option =>
+ // If the first part of the desired name matches a qualifier for this possible match, drop it.
+ val remainingParts = if (option.qualifiers contains parts.head) parts.drop(1) else parts
+ if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil
+ }
+
+ options.distinct match {
+ case (a, Nil) :: Nil => Some(a) // One match, no nested fields, use it.
+ // One match, but we also need to extract the requested nested field.
+ case (a, nestedFields) :: Nil =>
+ a.dataType match {
+ case StructType(fields) =>
+ Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
+ case _ => None // Don't know how to resolve these field references
+ }
+ case Nil => None // No matches.
+ case ambiguousReferences =>
+ throw new TreeNodeException(
+ this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
+ }
+ }
+}
+
+/**
+ * A logical plan node with no children.
+ */
+abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
+ self: Product =>
+
+ // Leaf nodes by definition cannot reference any input attributes.
+ def references = Set.empty
+}
+
+/**
+ * A logical node that represents a non-query command to be executed by the system. For example,
+ * commands can be used by parsers to represent DDL operations.
+ */
+abstract class Command extends LeafNode {
+ self: Product =>
+ def output = Seq.empty
+}
+
+/**
+ * Returned for commands supported by a given parser, but not catalyst. In general these are DDL
+ * commands that are passed directly to another system.
+ */
+case class NativeCommand(cmd: String) extends Command
+
+/**
+ * Returned by a parser when the users only wants to see what query plan would be executed, without
+ * actually performing the execution.
+ */
+case class ExplainCommand(plan: LogicalPlan) extends Command
+
+/**
+ * A logical plan node with single child.
+ */
+abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {
+ self: Product =>
+}
+
+/**
+ * A logical plan node with a left and right child.
+ */
+abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {
+ self: Product =>
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
new file mode 100644
index 0000000..1a1a2b9
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package logical
+
+import expressions._
+
+/**
+ * Transforms the input by forking and running the specified script.
+ *
+ * @param input the set of expression that should be passed to the script.
+ * @param script the command that should be executed.
+ * @param output the attributes that are produced by the script.
+ */
+case class ScriptTransformation(
+ input: Seq[Expression],
+ script: String,
+ output: Seq[Attribute],
+ child: LogicalPlan) extends UnaryNode {
+ def references = input.flatMap(_.references).toSet
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala
new file mode 100644
index 0000000..b5905a4
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package logical
+
+import expressions._
+import rules._
+
+object LocalRelation {
+ def apply(output: Attribute*) =
+ new LocalRelation(output)
+}
+
+case class LocalRelation(output: Seq[Attribute], data: Seq[Product] = Nil)
+ extends LeafNode with analysis.MultiInstanceRelation {
+
+ // TODO: Validate schema compliance.
+ def loadData(newData: Seq[Product]) = new LocalRelation(output, data ++ newData)
+
+ /**
+ * Returns an identical copy of this relation with new exprIds for all attributes. Different
+ * attributes are required when a relation is going to be included multiple times in the same
+ * query.
+ */
+ override final def newInstance: this.type = {
+ LocalRelation(output.map(_.newInstance), data).asInstanceOf[this.type]
+ }
+
+ override protected def stringArgs = Iterator(output)
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
new file mode 100644
index 0000000..8e98aab
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package logical
+
+import expressions._
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
+ def output = projectList.map(_.toAttribute)
+ def references = projectList.flatMap(_.references).toSet
+}
+
+/**
+ * Applies a [[catalyst.expressions.Generator Generator]] to a stream of input rows, combining the
+ * output of each into a new stream of rows. This operation is similar to a `flatMap` in functional
+ * programming with one important additional feature, which allows the input rows to be joined with
+ * their output.
+ * @param join when true, each output row is implicitly joined with the input tuple that produced
+ * it.
+ * @param outer when true, each input row will be output at least once, even if the output of the
+ * given `generator` is empty. `outer` has no effect when `join` is false.
+ * @param alias when set, this string is applied to the schema of the output of the transformation
+ * as a qualifier.
+ */
+case class Generate(
+ generator: Generator,
+ join: Boolean,
+ outer: Boolean,
+ alias: Option[String],
+ child: LogicalPlan)
+ extends UnaryNode {
+
+ protected def generatorOutput =
+ alias
+ .map(a => generator.output.map(_.withQualifiers(a :: Nil)))
+ .getOrElse(generator.output)
+
+ def output =
+ if (join) child.output ++ generatorOutput else generatorOutput
+
+ def references =
+ if (join) child.outputSet else generator.references
+}
+
+case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
+ def output = child.output
+ def references = condition.references
+}
+
+case class Union(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
+ // TODO: These aren't really the same attributes as nullability etc might change.
+ def output = left.output
+
+ override lazy val resolved =
+ childrenResolved &&
+ !left.output.zip(right.output).exists { case (l,r) => l.dataType != r.dataType }
+
+ def references = Set.empty
+}
+
+case class Join(
+ left: LogicalPlan,
+ right: LogicalPlan,
+ joinType: JoinType,
+ condition: Option[Expression]) extends BinaryNode {
+
+ def references = condition.map(_.references).getOrElse(Set.empty)
+ def output = left.output ++ right.output
+}
+
+case class InsertIntoTable(
+ table: BaseRelation,
+ partition: Map[String, Option[String]],
+ child: LogicalPlan,
+ overwrite: Boolean)
+ extends LogicalPlan {
+ // The table being inserted into is a child for the purposes of transformations.
+ def children = table :: child :: Nil
+ def references = Set.empty
+ def output = child.output
+
+ override lazy val resolved = childrenResolved && child.output.zip(table.output).forall {
+ case (childAttr, tableAttr) => childAttr.dataType == tableAttr.dataType
+ }
+}
+
+case class InsertIntoCreatedTable(
+ databaseName: Option[String],
+ tableName: String,
+ child: LogicalPlan) extends UnaryNode {
+ def references = Set.empty
+ def output = child.output
+}
+
+case class WriteToFile(
+ path: String,
+ child: LogicalPlan) extends UnaryNode {
+ def references = Set.empty
+ def output = child.output
+}
+
+case class Sort(order: Seq[SortOrder], child: LogicalPlan) extends UnaryNode {
+ def output = child.output
+ def references = order.flatMap(_.references).toSet
+}
+
+case class Aggregate(
+ groupingExpressions: Seq[Expression],
+ aggregateExpressions: Seq[NamedExpression],
+ child: LogicalPlan)
+ extends UnaryNode {
+
+ def output = aggregateExpressions.map(_.toAttribute)
+ def references = child.references
+}
+
+case class StopAfter(limit: Expression, child: LogicalPlan) extends UnaryNode {
+ def output = child.output
+ def references = limit.references
+}
+
+case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
+ def output = child.output.map(_.withQualifiers(alias :: Nil))
+ def references = Set.empty
+}
+
+case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: LogicalPlan)
+ extends UnaryNode {
+
+ def output = child.output
+ def references = Set.empty
+}
+
+case class Distinct(child: LogicalPlan) extends UnaryNode {
+ def output = child.output
+ def references = child.outputSet
+}
+
+case object NoRelation extends LeafNode {
+ def output = Nil
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
new file mode 100644
index 0000000..f7fcdc5
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package logical
+
+import expressions._
+
+/**
+ * Performs a physical redistribution of the data. Used when the consumer of the query
+ * result have expectations about the distribution and ordering of partitioned input data.
+ */
+abstract class RedistributeData extends UnaryNode {
+ self: Product =>
+
+ def output = child.output
+}
+
+case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan)
+ extends RedistributeData {
+
+ def references = sortExpressions.flatMap(_.references).toSet
+}
+
+case class Repartition(partitionExpressions: Seq[Expression], child: LogicalPlan)
+ extends RedistributeData {
+
+ def references = partitionExpressions.flatMap(_.references).toSet
+}
+
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/package.scala
new file mode 100644
index 0000000..a40ab4b
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+/**
+ * A a collection of common abstractions for query plans as well as
+ * a base logical plan representation.
+ */
+package object plans
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
new file mode 100644
index 0000000..2d8f3ad
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package plans
+package physical
+
+import expressions._
+import types._
+
+/**
+ * Specifies how tuples that share common expressions will be distributed when a query is executed
+ * in parallel on many machines. Distribution can be used to refer to two distinct physical
+ * properties:
+ * - Inter-node partitioning of data: In this case the distribution describes how tuples are
+ * partitioned across physical machines in a cluster. Knowing this property allows some
+ * operators (e.g., Aggregate) to perform partition local operations instead of global ones.
+ * - Intra-partition ordering of data: In this case the distribution describes guarantees made
+ * about how tuples are distributed within a single partition.
+ */
+sealed trait Distribution
+
+/**
+ * Represents a distribution where no promises are made about co-location of data.
+ */
+case object UnspecifiedDistribution extends Distribution
+
+/**
+ * Represents a distribution that only has a single partition and all tuples of the dataset
+ * are co-located.
+ */
+case object AllTuples extends Distribution
+
+/**
+ * Represents data where tuples that share the same values for the `clustering`
+ * [[catalyst.expressions.Expression Expressions]] will be co-located. Based on the context, this
+ * can mean such tuples are either co-located in the same partition or they will be contiguous
+ * within a single partition.
+ */
+case class ClusteredDistribution(clustering: Seq[Expression]) extends Distribution {
+ require(
+ clustering != Nil,
+ "The clustering expressions of a ClusteredDistribution should not be Nil. " +
+ "An AllTuples should be used to represent a distribution that only has " +
+ "a single partition.")
+}
+
+/**
+ * Represents data where tuples have been ordered according to the `ordering`
+ * [[catalyst.expressions.Expression Expressions]]. This is a strictly stronger guarantee than
+ * [[ClusteredDistribution]] as an ordering will ensure that tuples that share the same value for
+ * the ordering expressions are contiguous and will never be split across partitions.
+ */
+case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
+ require(
+ ordering != Nil,
+ "The ordering expressions of a OrderedDistribution should not be Nil. " +
+ "An AllTuples should be used to represent a distribution that only has " +
+ "a single partition.")
+
+ def clustering = ordering.map(_.child).toSet
+}
+
+sealed trait Partitioning {
+ /** Returns the number of partitions that the data is split across */
+ val numPartitions: Int
+
+ /**
+ * Returns true iff the guarantees made by this
+ * [[catalyst.plans.physical.Partitioning Partitioning]] are sufficient to satisfy
+ * the partitioning scheme mandated by the `required`
+ * [[catalyst.plans.physical.Distribution Distribution]], i.e. the current dataset does not
+ * need to be re-partitioned for the `required` Distribution (it is possible that tuples within
+ * a partition need to be reorganized).
+ */
+ def satisfies(required: Distribution): Boolean
+
+ /**
+ * Returns true iff all distribution guarantees made by this partitioning can also be made
+ * for the `other` specified partitioning.
+ * For example, two [[catalyst.plans.physical.HashPartitioning HashPartitioning]]s are
+ * only compatible if the `numPartitions` of them is the same.
+ */
+ def compatibleWith(other: Partitioning): Boolean
+}
+
+case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
+ override def satisfies(required: Distribution): Boolean = required match {
+ case UnspecifiedDistribution => true
+ case _ => false
+ }
+
+ override def compatibleWith(other: Partitioning): Boolean = other match {
+ case UnknownPartitioning(_) => true
+ case _ => false
+ }
+}
+
+case object SinglePartition extends Partitioning {
+ val numPartitions = 1
+
+ override def satisfies(required: Distribution): Boolean = true
+
+ override def compatibleWith(other: Partitioning) = other match {
+ case SinglePartition => true
+ case _ => false
+ }
+}
+
+case object BroadcastPartitioning extends Partitioning {
+ val numPartitions = 1
+
+ override def satisfies(required: Distribution): Boolean = true
+
+ override def compatibleWith(other: Partitioning) = other match {
+ case SinglePartition => true
+ case _ => false
+ }
+}
+
+/**
+ * Represents a partitioning where rows are split up across partitions based on the hash
+ * of `expressions`. All rows where `expressions` evaluate to the same values are guaranteed to be
+ * in the same partition.
+ */
+case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
+ extends Expression
+ with Partitioning {
+
+ def children = expressions
+ def references = expressions.flatMap(_.references).toSet
+ def nullable = false
+ def dataType = IntegerType
+
+ lazy val clusteringSet = expressions.toSet
+
+ override def satisfies(required: Distribution): Boolean = required match {
+ case UnspecifiedDistribution => true
+ case ClusteredDistribution(requiredClustering) =>
+ clusteringSet.subsetOf(requiredClustering.toSet)
+ case _ => false
+ }
+
+ override def compatibleWith(other: Partitioning) = other match {
+ case BroadcastPartitioning => true
+ case h: HashPartitioning if h == this => true
+ case _ => false
+ }
+}
+
+/**
+ * Represents a partitioning where rows are split across partitions based on some total ordering of
+ * the expressions specified in `ordering`. When data is partitioned in this manner the following
+ * two conditions are guaranteed to hold:
+ * - All row where the expressions in `ordering` evaluate to the same values will be in the same
+ * partition.
+ * - Each partition will have a `min` and `max` row, relative to the given ordering. All rows
+ * that are in between `min` and `max` in this `ordering` will reside in this partition.
+ */
+case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
+ extends Expression
+ with Partitioning {
+
+ def children = ordering
+ def references = ordering.flatMap(_.references).toSet
+ def nullable = false
+ def dataType = IntegerType
+
+ lazy val clusteringSet = ordering.map(_.child).toSet
+
+ override def satisfies(required: Distribution): Boolean = required match {
+ case UnspecifiedDistribution => true
+ case OrderedDistribution(requiredOrdering) =>
+ val minSize = Seq(requiredOrdering.size, ordering.size).min
+ requiredOrdering.take(minSize) == ordering.take(minSize)
+ case ClusteredDistribution(requiredClustering) =>
+ clusteringSet.subsetOf(requiredClustering.toSet)
+ case _ => false
+ }
+
+ override def compatibleWith(other: Partitioning) = other match {
+ case BroadcastPartitioning => true
+ case r: RangePartitioning if r == this => true
+ case _ => false
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala
new file mode 100644
index 0000000..6ff4891
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package rules
+
+import trees._
+
+abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
+
+ /** Name for this rule, automatically inferred based on class name. */
+ val ruleName: String = {
+ val className = getClass.getName
+ if (className endsWith "$") className.dropRight(1) else className
+ }
+
+ def apply(plan: TreeType): TreeType
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
new file mode 100644
index 0000000..68ae30c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package rules
+
+import trees._
+import util._
+
+abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
+
+ /**
+ * An execution strategy for rules that indicates the maximum number of executions. If the
+ * execution reaches fix point (i.e. converge) before maxIterations, it will stop.
+ */
+ abstract class Strategy { def maxIterations: Int }
+
+ /** A strategy that only runs once. */
+ case object Once extends Strategy { val maxIterations = 1 }
+
+ /** A strategy that runs until fix point or maxIterations times, whichever comes first. */
+ case class FixedPoint(maxIterations: Int) extends Strategy
+
+ /** A batch of rules. */
+ protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
+
+ /** Defines a sequence of rule batches, to be overridden by the implementation. */
+ protected val batches: Seq[Batch]
+
+ /**
+ * Executes the batches of rules defined by the subclass. The batches are executed serially
+ * using the defined execution strategy. Within each batch, rules are also executed serially.
+ */
+ def apply(plan: TreeType): TreeType = {
+ var curPlan = plan
+
+ batches.foreach { batch =>
+ var iteration = 1
+ var lastPlan = curPlan
+ curPlan = batch.rules.foldLeft(curPlan) { case (curPlan, rule) => rule(curPlan) }
+
+ // Run until fix point (or the max number of iterations as specified in the strategy.
+ while (iteration < batch.strategy.maxIterations && !curPlan.fastEquals(lastPlan)) {
+ lastPlan = curPlan
+ curPlan = batch.rules.foldLeft(curPlan) {
+ case (curPlan, rule) =>
+ val result = rule(curPlan)
+ if (!result.fastEquals(curPlan)) {
+ logger.debug(
+ s"""
+ |=== Applying Rule ${rule.ruleName} ===
+ |${sideBySide(curPlan.treeString, result.treeString).mkString("\n")}
+ """.stripMargin)
+ }
+
+ result
+ }
+ iteration += 1
+ }
+ }
+
+ curPlan
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/package.scala
new file mode 100644
index 0000000..26ab543
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+/**
+ * A framework for applying batches rewrite rules to trees, possibly to fixed point.
+ */
+package object rules
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
new file mode 100644
index 0000000..76ede87
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package trees
+
+import errors._
+
+object TreeNode {
+ private val currentId = new java.util.concurrent.atomic.AtomicLong
+ protected def nextId() = currentId.getAndIncrement()
+}
+
+/** Used by [[TreeNode.getNodeNumbered]] when traversing the tree for a given number */
+private class MutableInt(var i: Int)
+
+abstract class TreeNode[BaseType <: TreeNode[BaseType]] {
+ self: BaseType with Product =>
+
+ /** Returns a Seq of the children of this node */
+ def children: Seq[BaseType]
+
+ /**
+ * A globally unique id for this specific instance. Not preserved across copies.
+ * Unlike `equals`, `id` can be used to differentiate distinct but structurally
+ * identical branches of a tree.
+ */
+ val id = TreeNode.nextId()
+
+ /**
+ * Returns true if other is the same [[catalyst.trees.TreeNode TreeNode]] instance. Unlike
+ * `equals` this function will return false for different instances of structurally identical
+ * trees.
+ */
+ def sameInstance(other: TreeNode[_]): Boolean = {
+ this.id == other.id
+ }
+
+ /**
+ * Faster version of equality which short-circuits when two treeNodes are the same instance.
+ * We don't just override Object.Equals, as doing so prevents the scala compiler from from
+ * generating case class `equals` methods
+ */
+ def fastEquals(other: TreeNode[_]): Boolean = {
+ sameInstance(other) || this == other
+ }
+
+ /**
+ * Runs the given function on this node and then recursively on [[children]].
+ * @param f the function to be applied to each node in the tree.
+ */
+ def foreach(f: BaseType => Unit): Unit = {
+ f(this)
+ children.foreach(_.foreach(f))
+ }
+
+ /**
+ * Returns a Seq containing the result of applying the given function to each
+ * node in this tree in a preorder traversal.
+ * @param f the function to be applied.
+ */
+ def map[A](f: BaseType => A): Seq[A] = {
+ val ret = new collection.mutable.ArrayBuffer[A]()
+ foreach(ret += f(_))
+ ret
+ }
+
+ /**
+ * Returns a Seq by applying a function to all nodes in this tree and using the elements of the
+ * resulting collections.
+ */
+ def flatMap[A](f: BaseType => TraversableOnce[A]): Seq[A] = {
+ val ret = new collection.mutable.ArrayBuffer[A]()
+ foreach(ret ++= f(_))
+ ret
+ }
+
+ /**
+ * Returns a Seq containing the result of applying a partial function to all elements in this
+ * tree on which the function is defined.
+ */
+ def collect[B](pf: PartialFunction[BaseType, B]): Seq[B] = {
+ val ret = new collection.mutable.ArrayBuffer[B]()
+ val lifted = pf.lift
+ foreach(node => lifted(node).foreach(ret.+=))
+ ret
+ }
+
+ /**
+ * Returns a copy of this node where `f` has been applied to all the nodes children.
+ */
+ def mapChildren(f: BaseType => BaseType): this.type = {
+ var changed = false
+ val newArgs = productIterator.map {
+ case arg: TreeNode[_] if children contains arg =>
+ val newChild = f(arg.asInstanceOf[BaseType])
+ if (newChild fastEquals arg) {
+ arg
+ } else {
+ changed = true
+ newChild
+ }
+ case nonChild: AnyRef => nonChild
+ case null => null
+ }.toArray
+ if (changed) makeCopy(newArgs) else this
+ }
+
+ /**
+ * Returns a copy of this node with the children replaced.
+ * TODO: Validate somewhere (in debug mode?) that children are ordered correctly.
+ */
+ def withNewChildren(newChildren: Seq[BaseType]): this.type = {
+ assert(newChildren.size == children.size, "Incorrect number of children")
+ var changed = false
+ val remainingNewChildren = newChildren.toBuffer
+ val remainingOldChildren = children.toBuffer
+ val newArgs = productIterator.map {
+ case arg: TreeNode[_] if children contains arg =>
+ val newChild = remainingNewChildren.remove(0)
+ val oldChild = remainingOldChildren.remove(0)
+ if (newChild fastEquals oldChild) {
+ oldChild
+ } else {
+ changed = true
+ newChild
+ }
+ case nonChild: AnyRef => nonChild
+ case null => null
+ }.toArray
+
+ if (changed) makeCopy(newArgs) else this
+ }
+
+ /**
+ * Returns a copy of this node where `rule` has been recursively applied to the tree.
+ * When `rule` does not apply to a given node it is left unchanged.
+ * Users should not expect a specific directionality. If a specific directionality is needed,
+ * transformDown or transformUp should be used.
+ * @param rule the function use to transform this nodes children
+ */
+ def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = {
+ transformDown(rule)
+ }
+
+ /**
+ * Returns a copy of this node where `rule` has been recursively applied to it and all of its
+ * children (pre-order). When `rule` does not apply to a given node it is left unchanged.
+ * @param rule the function used to transform this nodes children
+ */
+ def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
+ val afterRule = rule.applyOrElse(this, identity[BaseType])
+ // Check if unchanged and then possibly return old copy to avoid gc churn.
+ if (this fastEquals afterRule) {
+ transformChildrenDown(rule)
+ } else {
+ afterRule.transformChildrenDown(rule)
+ }
+ }
+
+ /**
+ * Returns a copy of this node where `rule` has been recursively applied to all the children of
+ * this node. When `rule` does not apply to a given node it is left unchanged.
+ * @param rule the function used to transform this nodes children
+ */
+ def transformChildrenDown(rule: PartialFunction[BaseType, BaseType]): this.type = {
+ var changed = false
+ val newArgs = productIterator.map {
+ case arg: TreeNode[_] if children contains arg =>
+ val newChild = arg.asInstanceOf[BaseType].transformDown(rule)
+ if (!(newChild fastEquals arg)) {
+ changed = true
+ newChild
+ } else {
+ arg
+ }
+ case m: Map[_,_] => m
+ case args: Traversable[_] => args.map {
+ case arg: TreeNode[_] if children contains arg =>
+ val newChild = arg.asInstanceOf[BaseType].transformDown(rule)
+ if (!(newChild fastEquals arg)) {
+ changed = true
+ newChild
+ } else {
+ arg
+ }
+ case other => other
+ }
+ case nonChild: AnyRef => nonChild
+ case null => null
+ }.toArray
+ if (changed) makeCopy(newArgs) else this
+ }
+
+ /**
+ * Returns a copy of this node where `rule` has been recursively applied first to all of its
+ * children and then itself (post-order). When `rule` does not apply to a given node, it is left
+ * unchanged.
+ * @param rule the function use to transform this nodes children
+ */
+ def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {
+ val afterRuleOnChildren = transformChildrenUp(rule);
+ if (this fastEquals afterRuleOnChildren) {
+ rule.applyOrElse(this, identity[BaseType])
+ } else {
+ rule.applyOrElse(afterRuleOnChildren, identity[BaseType])
+ }
+ }
+
+ def transformChildrenUp(rule: PartialFunction[BaseType, BaseType]): this.type = {
+ var changed = false
+ val newArgs = productIterator.map {
+ case arg: TreeNode[_] if children contains arg =>
+ val newChild = arg.asInstanceOf[BaseType].transformUp(rule)
+ if (!(newChild fastEquals arg)) {
+ changed = true
+ newChild
+ } else {
+ arg
+ }
+ case m: Map[_,_] => m
+ case args: Traversable[_] => args.map {
+ case arg: TreeNode[_] if children contains arg =>
+ val newChild = arg.asInstanceOf[BaseType].transformUp(rule)
+ if (!(newChild fastEquals arg)) {
+ changed = true
+ newChild
+ } else {
+ arg
+ }
+ case other => other
+ }
+ case nonChild: AnyRef => nonChild
+ case null => null
+ }.toArray
+ if (changed) makeCopy(newArgs) else this
+ }
+
+ /**
+ * Args to the constructor that should be copied, but not transformed.
+ * These are appended to the transformed args automatically by makeCopy
+ * @return
+ */
+ protected def otherCopyArgs: Seq[AnyRef] = Nil
+
+ /**
+ * Creates a copy of this type of tree node after a transformation.
+ * Must be overridden by child classes that have constructor arguments
+ * that are not present in the productIterator.
+ * @param newArgs the new product arguments.
+ */
+ def makeCopy(newArgs: Array[AnyRef]): this.type = attachTree(this, "makeCopy") {
+ try {
+ val defaultCtor = getClass.getConstructors.head
+ if (otherCopyArgs.isEmpty) {
+ defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type]
+ } else {
+ defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[this.type]
+ }
+ } catch {
+ case e: java.lang.IllegalArgumentException =>
+ throw new TreeNodeException(
+ this, s"Failed to copy node. Is otherCopyArgs specified correctly for $nodeName?")
+ }
+ }
+
+ /** Returns the name of this type of TreeNode. Defaults to the class name. */
+ def nodeName = getClass.getSimpleName
+
+ /**
+ * The arguments that should be included in the arg string. Defaults to the `productIterator`.
+ */
+ protected def stringArgs = productIterator
+
+ /** Returns a string representing the arguments to this node, minus any children */
+ def argString: String = productIterator.flatMap {
+ case tn: TreeNode[_] if children contains tn => Nil
+ case tn: TreeNode[_] if tn.toString contains "\n" => s"(${tn.simpleString})" :: Nil
+ case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil
+ case set: Set[_] => set.mkString("{", ",", "}") :: Nil
+ case other => other :: Nil
+ }.mkString(", ")
+
+ /** String representation of this node without any children */
+ def simpleString = s"$nodeName $argString"
+
+ override def toString: String = treeString
+
+ /** Returns a string representation of the nodes in this tree */
+ def treeString = generateTreeString(0, new StringBuilder).toString
+
+ /**
+ * Returns a string representation of the nodes in this tree, where each operator is numbered.
+ * The numbers can be used with [[trees.TreeNode.apply apply]] to easily access specific subtrees.
+ */
+ def numberedTreeString =
+ treeString.split("\n").zipWithIndex.map { case (line, i) => f"$i%02d $line" }.mkString("\n")
+
+ /**
+ * Returns the tree node at the specified number.
+ * Numbers for each node can be found in the [[numberedTreeString]].
+ */
+ def apply(number: Int): BaseType = getNodeNumbered(new MutableInt(number))
+
+ protected def getNodeNumbered(number: MutableInt): BaseType = {
+ if (number.i < 0) {
+ null.asInstanceOf[BaseType]
+ } else if (number.i == 0) {
+ this
+ } else {
+ number.i -= 1
+ children.map(_.getNodeNumbered(number)).find(_ != null).getOrElse(null.asInstanceOf[BaseType])
+ }
+ }
+
+ /** Appends the string represent of this node and its children to the given StringBuilder. */
+ protected def generateTreeString(depth: Int, builder: StringBuilder): StringBuilder = {
+ builder.append(" " * depth)
+ builder.append(simpleString)
+ builder.append("\n")
+ children.foreach(_.generateTreeString(depth + 1, builder))
+ builder
+ }
+}
+
+/**
+ * A [[TreeNode]] that has two children, [[left]] and [[right]].
+ */
+trait BinaryNode[BaseType <: TreeNode[BaseType]] {
+ def left: BaseType
+ def right: BaseType
+
+ def children = Seq(left, right)
+}
+
+/**
+ * A [[TreeNode]] with no children.
+ */
+trait LeafNode[BaseType <: TreeNode[BaseType]] {
+ def children = Nil
+}
+
+/**
+ * A [[TreeNode]] with a single [[child]].
+ */
+trait UnaryNode[BaseType <: TreeNode[BaseType]] {
+ def child: BaseType
+ def children = child :: Nil
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala
new file mode 100644
index 0000000..e2da1d2
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+/**
+ * A library for easily manipulating trees of operators. Operators that extend TreeNode are
+ * granted the following interface:
+ * <ul>
+ * <li>Scala collection like methods (foreach, map, flatMap, collect, etc)</li>
+ * <li>
+ * transform - accepts a partial function that is used to generate a new tree. When the
+ * partial function can be applied to a given tree segment, that segment is replaced with the
+ * result. After attempting to apply the partial function to a given node, the transform
+ * function recursively attempts to apply the function to that node's children.
+ * </li>
+ * <li>debugging support - pretty printing, easy splicing of trees, etc.</li>
+ * </ul>
+ */
+package object trees {
+ // Since we want tree nodes to be lightweight, we create one logger for all treenode instances.
+ protected val logger = Logger("catalyst.trees")
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
new file mode 100644
index 0000000..6eb2b62
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+package types
+
+import expressions.Expression
+
+abstract class DataType {
+ /** Matches any expression that evaluates to this DataType */
+ def unapply(a: Expression): Boolean = a match {
+ case e: Expression if e.dataType == this => true
+ case _ => false
+ }
+}
+
+case object NullType extends DataType
+
+abstract class NativeType extends DataType {
+ type JvmType
+ val ordering: Ordering[JvmType]
+}
+
+case object StringType extends NativeType {
+ type JvmType = String
+ val ordering = implicitly[Ordering[JvmType]]
+}
+case object BinaryType extends DataType {
+ type JvmType = Array[Byte]
+}
+case object BooleanType extends NativeType {
+ type JvmType = Boolean
+ val ordering = implicitly[Ordering[JvmType]]
+}
+
+abstract class NumericType extends NativeType {
+ // Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
+ // implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
+ // type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
+ // desugared by the compiler into an argument to the objects constructor. This means there is no
+ // longer an no argument constructor and thus the JVM cannot serialize the object anymore.
+ val numeric: Numeric[JvmType]
+}
+
+/** Matcher for any expressions that evaluate to [[IntegralType]]s */
+object IntegralType {
+ def unapply(a: Expression): Boolean = a match {
+ case e: Expression if e.dataType.isInstanceOf[IntegralType] => true
+ case _ => false
+ }
+}
+
+abstract class IntegralType extends NumericType {
+ val integral: Integral[JvmType]
+}
+
+case object LongType extends IntegralType {
+ type JvmType = Long
+ val numeric = implicitly[Numeric[Long]]
+ val integral = implicitly[Integral[Long]]
+ val ordering = implicitly[Ordering[JvmType]]
+}
+
+case object IntegerType extends IntegralType {
+ type JvmType = Int
+ val numeric = implicitly[Numeric[Int]]
+ val integral = implicitly[Integral[Int]]
+ val ordering = implicitly[Ordering[JvmType]]
+}
+
+case object ShortType extends IntegralType {
+ type JvmType = Short
+ val numeric = implicitly[Numeric[Short]]
+ val integral = implicitly[Integral[Short]]
+ val ordering = implicitly[Ordering[JvmType]]
+}
+
+case object ByteType extends IntegralType {
+ type JvmType = Byte
+ val numeric = implicitly[Numeric[Byte]]
+ val integral = implicitly[Integral[Byte]]
+ val ordering = implicitly[Ordering[JvmType]]
+}
+
+/** Matcher for any expressions that evaluate to [[FractionalType]]s */
+object FractionalType {
+ def unapply(a: Expression): Boolean = a match {
+ case e: Expression if e.dataType.isInstanceOf[FractionalType] => true
+ case _ => false
+ }
+}
+abstract class FractionalType extends NumericType {
+ val fractional: Fractional[JvmType]
+}
+
+case object DecimalType extends FractionalType {
+ type JvmType = BigDecimal
+ val numeric = implicitly[Numeric[BigDecimal]]
+ val fractional = implicitly[Fractional[BigDecimal]]
+ val ordering = implicitly[Ordering[JvmType]]
+}
+
+case object DoubleType extends FractionalType {
+ type JvmType = Double
+ val numeric = implicitly[Numeric[Double]]
+ val fractional = implicitly[Fractional[Double]]
+ val ordering = implicitly[Ordering[JvmType]]
+}
+
+case object FloatType extends FractionalType {
+ type JvmType = Float
+ val numeric = implicitly[Numeric[Float]]
+ val fractional = implicitly[Fractional[Float]]
+ val ordering = implicitly[Ordering[JvmType]]
+}
+
+case class ArrayType(elementType: DataType) extends DataType
+
+case class StructField(name: String, dataType: DataType, nullable: Boolean)
+case class StructType(fields: Seq[StructField]) extends DataType
+
+case class MapType(keyType: DataType, valueType: DataType) extends DataType
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/package.scala
new file mode 100644
index 0000000..b65a561
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+/**
+ * Contains a type system for attributes produced by relations, including complex types like
+ * structs, arrays and maps.
+ */
+package object types
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
new file mode 100644
index 0000000..52adea2
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package catalyst
+
+import java.io.{PrintWriter, ByteArrayOutputStream, FileInputStream, File}
+
+package object util {
+ /**
+ * Returns a path to a temporary file that probably does not exist.
+ * Note, there is always the race condition that someone created this
+ * file since the last time we checked. Thus, this shouldn't be used
+ * for anything security conscious.
+ */
+ def getTempFilePath(prefix: String, suffix: String = ""): File = {
+ val tempFile = File.createTempFile(prefix, suffix)
+ tempFile.delete()
+ tempFile
+ }
+
+ def fileToString(file: File, encoding: String = "UTF-8") = {
+ val inStream = new FileInputStream(file)
+ val outStream = new ByteArrayOutputStream
+ try {
+ var reading = true
+ while ( reading ) {
+ inStream.read() match {
+ case -1 => reading = false
+ case c => outStream.write(c)
+ }
+ }
+ outStream.flush()
+ }
+ finally {
+ inStream.close()
+ }
+ new String(outStream.toByteArray, encoding)
+ }
+
+ def resourceToString(
+ resource:String,
+ encoding: String = "UTF-8",
+ classLoader: ClassLoader = this.getClass.getClassLoader) = {
+ val inStream = classLoader.getResourceAsStream(resource)
+ val outStream = new ByteArrayOutputStream
+ try {
+ var reading = true
+ while ( reading ) {
+ inStream.read() match {
+ case -1 => reading = false
+ case c => outStream.write(c)
+ }
+ }
+ outStream.flush()
+ }
+ finally {
+ inStream.close()
+ }
+ new String(outStream.toByteArray, encoding)
+ }
+
+ def stringToFile(file: File, str: String): File = {
+ val out = new PrintWriter(file)
+ out.write(str)
+ out.close()
+ file
+ }
+
+ def sideBySide(left: String, right: String): Seq[String] = {
+ sideBySide(left.split("\n"), right.split("\n"))
+ }
+
+ def sideBySide(left: Seq[String], right: Seq[String]): Seq[String] = {
+ val maxLeftSize = left.map(_.size).max
+ val leftPadded = left ++ Seq.fill(math.max(right.size - left.size, 0))("")
+ val rightPadded = right ++ Seq.fill(math.max(left.size - right.size, 0))("")
+
+ leftPadded.zip(rightPadded).map {
+ case (l, r) => (if (l == r) " " else "!") + l + (" " * ((maxLeftSize - l.size) + 3)) + r
+ }
+ }
+
+ def stackTraceToString(t: Throwable): String = {
+ val out = new java.io.ByteArrayOutputStream
+ val writer = new PrintWriter(out)
+ t.printStackTrace(writer)
+ writer.flush()
+ new String(out.toByteArray)
+ }
+
+ def stringOrNull(a: AnyRef) = if (a == null) null else a.toString
+
+ def benchmark[A](f: => A): A = {
+ val startTime = System.nanoTime()
+ val ret = f
+ val endTime = System.nanoTime()
+ println(s"${(endTime - startTime).toDouble / 1000000}ms")
+ ret
+ }
+
+ /* FIX ME
+ implicit class debugLogging(a: AnyRef) {
+ def debugLogging() {
+ org.apache.log4j.Logger.getLogger(a.getClass.getName).setLevel(org.apache.log4j.Level.DEBUG)
+ }
+ } */
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9aadcffa/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
new file mode 100644
index 0000000..9ec3168
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * Allows the execution of relational queries, including those expressed in SQL using Spark.
+ *
+ * Note that this package is located in catalyst instead of in core so that all subprojects can
+ * inherit the settings from this package object.
+ */
+package object sql {
+
+ protected[sql] def Logger(name: String) =
+ com.typesafe.scalalogging.slf4j.Logger(org.slf4j.LoggerFactory.getLogger(name))
+
+ protected[sql] type Logging = com.typesafe.scalalogging.slf4j.Logging
+
+ type Row = catalyst.expressions.Row
+
+ object Row {
+ /**
+ * This method can be used to extract fields from a [[Row]] object in a pattern match. Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val pairs = sql("SELECT key, value FROM src").rdd.map {
+ * case Row(key: Int, value: String) =>
+ * key -> value
+ * }
+ * }}}
+ */
+ def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)
+ }
+}