You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/04/23 21:44:08 UTC

[2/2] spark git commit: [SPARK-14872][SQL] Restructure command package

[SPARK-14872][SQL] Restructure command package

## What changes were proposed in this pull request?
This patch restructures sql.execution.command package to break the commands into multiple files, in some logical organization: databases, tables, views, functions.

I also renamed basicOperators.scala to basicLogicalOperators.scala and basicPhysicalOperators.scala.

## How was this patch tested?
N/A - all I did was moving code around.

Author: Reynold Xin <rx...@databricks.com>

Closes #12636 from rxin/SPARK-14872.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c8a0ec9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c8a0ec9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c8a0ec9

Branch: refs/heads/master
Commit: 5c8a0ec99bded2271481f8d6cf5443fea5da4bbd
Parents: fddd3ae
Author: Reynold Xin <rx...@databricks.com>
Authored: Sat Apr 23 12:44:00 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Sat Apr 23 12:44:00 2016 -0700

----------------------------------------------------------------------
 .../plans/logical/basicLogicalOperators.scala   | 709 +++++++++++++++++++
 .../catalyst/plans/logical/basicOperators.scala | 709 -------------------
 .../spark/sql/execution/basicOperators.scala    | 530 --------------
 .../sql/execution/basicPhysicalOperators.scala  | 530 ++++++++++++++
 .../spark/sql/execution/command/cache.scala     |  70 ++
 .../spark/sql/execution/command/commands.scala  | 264 +------
 .../spark/sql/execution/command/databases.scala |  64 ++
 .../spark/sql/execution/command/functions.scala |  99 ++-
 .../spark/sql/execution/command/tables.scala    |  77 +-
 9 files changed, 1556 insertions(+), 1496 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5c8a0ec9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
new file mode 100644
index 0000000..a445ce6
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -0,0 +1,709 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.types._
+
+/**
+ * When planning take() or collect() operations, this special node that is inserted at the top of
+ * the logical plan before invoking the query planner.
+ *
+ * Rules can pattern-match on this node in order to apply transformations that only take effect
+ * at the top of the logical query plan.
+ */
+case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+}
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+  override def maxRows: Option[Long] = child.maxRows
+
+  override lazy val resolved: Boolean = {
+    val hasSpecialExpressions = projectList.exists ( _.collect {
+        case agg: AggregateExpression => agg
+        case generator: Generator => generator
+        case window: WindowExpression => window
+      }.nonEmpty
+    )
+
+    !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
+  }
+
+  override def validConstraints: Set[Expression] =
+    child.constraints.union(getAliasedConstraints(projectList))
+}
+
+/**
+ * Applies a [[Generator]] to a stream of input rows, combining the
+ * output of each into a new stream of rows.  This operation is similar to a `flatMap` in functional
+ * programming with one important additional feature, which allows the input rows to be joined with
+ * their output.
+ *
+ * @param generator the generator expression
+ * @param join  when true, each output row is implicitly joined with the input tuple that produced
+ *              it.
+ * @param outer when true, each input row will be output at least once, even if the output of the
+ *              given `generator` is empty. `outer` has no effect when `join` is false.
+ * @param qualifier Qualifier for the attributes of generator(UDTF)
+ * @param generatorOutput The output schema of the Generator.
+ * @param child Children logical plan node
+ */
+case class Generate(
+    generator: Generator,
+    join: Boolean,
+    outer: Boolean,
+    qualifier: Option[String],
+    generatorOutput: Seq[Attribute],
+    child: LogicalPlan)
+  extends UnaryNode {
+
+  /** The set of all attributes produced by this node. */
+  def generatedSet: AttributeSet = AttributeSet(generatorOutput)
+
+  override lazy val resolved: Boolean = {
+    generator.resolved &&
+      childrenResolved &&
+      generator.elementTypes.length == generatorOutput.length &&
+      generatorOutput.forall(_.resolved)
+  }
+
+  override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)
+
+  def output: Seq[Attribute] = {
+    val qualified = qualifier.map(q =>
+      // prepend the new qualifier to the existed one
+      generatorOutput.map(a => a.withQualifier(Some(q)))
+    ).getOrElse(generatorOutput)
+
+    if (join) child.output ++ qualified else qualified
+  }
+}
+
+case class Filter(condition: Expression, child: LogicalPlan)
+  extends UnaryNode with PredicateHelper {
+  override def output: Seq[Attribute] = child.output
+
+  override def maxRows: Option[Long] = child.maxRows
+
+  override protected def validConstraints: Set[Expression] =
+    child.constraints.union(splitConjunctivePredicates(condition).toSet)
+}
+
+abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
+
+  protected def leftConstraints: Set[Expression] = left.constraints
+
+  protected def rightConstraints: Set[Expression] = {
+    require(left.output.size == right.output.size)
+    val attributeRewrites = AttributeMap(right.output.zip(left.output))
+    right.constraints.map(_ transform {
+      case a: Attribute => attributeRewrites(a)
+    })
+  }
+}
+
+private[sql] object SetOperation {
+  def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right))
+}
+
+case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
+
+  def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
+
+  override def output: Seq[Attribute] =
+    left.output.zip(right.output).map { case (leftAttr, rightAttr) =>
+      leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
+    }
+
+  override protected def validConstraints: Set[Expression] =
+    leftConstraints.union(rightConstraints)
+
+  // Intersect are only resolved if they don't introduce ambiguous expression ids,
+  // since the Optimizer will convert Intersect to Join.
+  override lazy val resolved: Boolean =
+    childrenResolved &&
+      left.output.length == right.output.length &&
+      left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
+      duplicateResolved
+
+  override def maxRows: Option[Long] = {
+    if (children.exists(_.maxRows.isEmpty)) {
+      None
+    } else {
+      Some(children.flatMap(_.maxRows).min)
+    }
+  }
+
+  override def statistics: Statistics = {
+    val leftSize = left.statistics.sizeInBytes
+    val rightSize = right.statistics.sizeInBytes
+    val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize
+    Statistics(sizeInBytes = sizeInBytes)
+  }
+}
+
+case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
+  /** We don't use right.output because those rows get excluded from the set. */
+  override def output: Seq[Attribute] = left.output
+
+  override protected def validConstraints: Set[Expression] = leftConstraints
+
+  override lazy val resolved: Boolean =
+    childrenResolved &&
+      left.output.length == right.output.length &&
+      left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType }
+
+  override def statistics: Statistics = {
+    Statistics(sizeInBytes = left.statistics.sizeInBytes)
+  }
+}
+
+/** Factory for constructing new `Union` nodes. */
+object Union {
+  def apply(left: LogicalPlan, right: LogicalPlan): Union = {
+    Union (left :: right :: Nil)
+  }
+}
+
+case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
+  override def maxRows: Option[Long] = {
+    if (children.exists(_.maxRows.isEmpty)) {
+      None
+    } else {
+      Some(children.flatMap(_.maxRows).sum)
+    }
+  }
+
+  // updating nullability to make all the children consistent
+  override def output: Seq[Attribute] =
+    children.map(_.output).transpose.map(attrs =>
+      attrs.head.withNullability(attrs.exists(_.nullable)))
+
+  override lazy val resolved: Boolean = {
+    // allChildrenCompatible needs to be evaluated after childrenResolved
+    def allChildrenCompatible: Boolean =
+      children.tail.forall( child =>
+        // compare the attribute number with the first child
+        child.output.length == children.head.output.length &&
+        // compare the data types with the first child
+        child.output.zip(children.head.output).forall {
+          case (l, r) => l.dataType == r.dataType }
+      )
+
+    children.length > 1 && childrenResolved && allChildrenCompatible
+  }
+
+  override def statistics: Statistics = {
+    val sizeInBytes = children.map(_.statistics.sizeInBytes).sum
+    Statistics(sizeInBytes = sizeInBytes)
+  }
+
+  /**
+   * Maps the constraints containing a given (original) sequence of attributes to those with a
+   * given (reference) sequence of attributes. Given the nature of union, we expect that the
+   * mapping between the original and reference sequences are symmetric.
+   */
+  private def rewriteConstraints(
+      reference: Seq[Attribute],
+      original: Seq[Attribute],
+      constraints: Set[Expression]): Set[Expression] = {
+    require(reference.size == original.size)
+    val attributeRewrites = AttributeMap(original.zip(reference))
+    constraints.map(_ transform {
+      case a: Attribute => attributeRewrites(a)
+    })
+  }
+
+  private def merge(a: Set[Expression], b: Set[Expression]): Set[Expression] = {
+    val common = a.intersect(b)
+    // The constraint with only one reference could be easily inferred as predicate
+    // Grouping the constraints by it's references so we can combine the constraints with same
+    // reference together
+    val othera = a.diff(common).filter(_.references.size == 1).groupBy(_.references.head)
+    val otherb = b.diff(common).filter(_.references.size == 1).groupBy(_.references.head)
+    // loose the constraints by: A1 && B1 || A2 && B2  ->  (A1 || A2) && (B1 || B2)
+    val others = (othera.keySet intersect otherb.keySet).map { attr =>
+      Or(othera(attr).reduceLeft(And), otherb(attr).reduceLeft(And))
+    }
+    common ++ others
+  }
+
+  override protected def validConstraints: Set[Expression] = {
+    children
+      .map(child => rewriteConstraints(children.head.output, child.output, child.constraints))
+      .reduce(merge(_, _))
+  }
+}
+
+case class Join(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    joinType: JoinType,
+    condition: Option[Expression])
+  extends BinaryNode with PredicateHelper {
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case LeftExistence(_) =>
+        left.output
+      case LeftOuter =>
+        left.output ++ right.output.map(_.withNullability(true))
+      case RightOuter =>
+        left.output.map(_.withNullability(true)) ++ right.output
+      case FullOuter =>
+        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
+      case _ =>
+        left.output ++ right.output
+    }
+  }
+
+  override protected def validConstraints: Set[Expression] = {
+    joinType match {
+      case Inner if condition.isDefined =>
+        left.constraints
+          .union(right.constraints)
+          .union(splitConjunctivePredicates(condition.get).toSet)
+      case LeftSemi if condition.isDefined =>
+        left.constraints
+          .union(splitConjunctivePredicates(condition.get).toSet)
+      case Inner =>
+        left.constraints.union(right.constraints)
+      case LeftExistence(_) =>
+        left.constraints
+      case LeftOuter =>
+        left.constraints
+      case RightOuter =>
+        right.constraints
+      case FullOuter =>
+        Set.empty[Expression]
+    }
+  }
+
+  def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
+
+  // Joins are only resolved if they don't introduce ambiguous expression ids.
+  // NaturalJoin should be ready for resolution only if everything else is resolved here
+  lazy val resolvedExceptNatural: Boolean = {
+    childrenResolved &&
+      expressions.forall(_.resolved) &&
+      duplicateResolved &&
+      condition.forall(_.dataType == BooleanType)
+  }
+
+  // if not a natural join, use `resolvedExceptNatural`. if it is a natural join or
+  // using join, we still need to eliminate natural or using before we mark it resolved.
+  override lazy val resolved: Boolean = joinType match {
+    case NaturalJoin(_) => false
+    case UsingJoin(_, _) => false
+    case _ => resolvedExceptNatural
+  }
+}
+
+/**
+ * A hint for the optimizer that we should broadcast the `child` if used in a join operator.
+ */
+case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  // We manually set statistics of BroadcastHint to smallest value to make sure
+  // the plan wrapped by BroadcastHint will be considered to broadcast later.
+  override def statistics: Statistics = Statistics(sizeInBytes = 1)
+}
+
+case class InsertIntoTable(
+    table: LogicalPlan,
+    partition: Map[String, Option[String]],
+    child: LogicalPlan,
+    overwrite: Boolean,
+    ifNotExists: Boolean)
+  extends LogicalPlan {
+
+  override def children: Seq[LogicalPlan] = child :: Nil
+  override def output: Seq[Attribute] = Seq.empty
+
+  assert(overwrite || !ifNotExists)
+  override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall {
+    case (childAttr, tableAttr) =>
+      DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType)
+  }
+}
+
+/**
+ * A container for holding named common table expressions (CTEs) and a query plan.
+ * This operator will be removed during analysis and the relations will be substituted into child.
+ *
+ * @param child The final query of this CTE.
+ * @param cteRelations Queries that this CTE defined,
+ *                     key is the alias of the CTE definition,
+ *                     value is the CTE definition.
+ */
+case class With(child: LogicalPlan, cteRelations: Map[String, SubqueryAlias]) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+}
+
+case class WithWindowDefinition(
+    windowDefinitions: Map[String, WindowSpecDefinition],
+    child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+}
+
+/**
+ * @param order  The ordering expressions
+ * @param global True means global sorting apply for entire data set,
+ *               False means sorting only apply within the partition.
+ * @param child  Child logical plan
+ */
+case class Sort(
+    order: Seq[SortOrder],
+    global: Boolean,
+    child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def maxRows: Option[Long] = child.maxRows
+}
+
+/** Factory for constructing new `Range` nodes. */
+object Range {
+  def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = {
+    val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes
+    new Range(start, end, step, numSlices, output)
+  }
+}
+
+case class Range(
+    start: Long,
+    end: Long,
+    step: Long,
+    numSlices: Int,
+    output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
+  require(step != 0, "step cannot be 0")
+  val numElements: BigInt = {
+    val safeStart = BigInt(start)
+    val safeEnd = BigInt(end)
+    if ((safeEnd - safeStart) % step == 0 || (safeEnd > safeStart) != (step > 0)) {
+      (safeEnd - safeStart) / step
+    } else {
+      // the remainder has the same sign with range, could add 1 more
+      (safeEnd - safeStart) / step + 1
+    }
+  }
+
+  override def newInstance(): Range =
+    Range(start, end, step, numSlices, output.map(_.newInstance()))
+
+  override def statistics: Statistics = {
+    val sizeInBytes = LongType.defaultSize * numElements
+    Statistics( sizeInBytes = sizeInBytes )
+  }
+}
+
+case class Aggregate(
+    groupingExpressions: Seq[Expression],
+    aggregateExpressions: Seq[NamedExpression],
+    child: LogicalPlan)
+  extends UnaryNode {
+
+  override lazy val resolved: Boolean = {
+    val hasWindowExpressions = aggregateExpressions.exists ( _.collect {
+        case window: WindowExpression => window
+      }.nonEmpty
+    )
+
+    !expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions
+  }
+
+  override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
+  override def maxRows: Option[Long] = child.maxRows
+
+  override def validConstraints: Set[Expression] =
+    child.constraints.union(getAliasedConstraints(aggregateExpressions))
+
+  override def statistics: Statistics = {
+    if (groupingExpressions.isEmpty) {
+      Statistics(sizeInBytes = 1)
+    } else {
+      super.statistics
+    }
+  }
+}
+
+case class Window(
+    windowExpressions: Seq[NamedExpression],
+    partitionSpec: Seq[Expression],
+    orderSpec: Seq[SortOrder],
+    child: LogicalPlan) extends UnaryNode {
+
+  override def output: Seq[Attribute] =
+    child.output ++ windowExpressions.map(_.toAttribute)
+
+  def windowOutputSet: AttributeSet = AttributeSet(windowExpressions.map(_.toAttribute))
+}
+
+private[sql] object Expand {
+  /**
+   * Extract attribute set according to the grouping id.
+   *
+   * @param bitmask bitmask to represent the selected of the attribute sequence
+   * @param attrs the attributes in sequence
+   * @return the attributes of non selected specified via bitmask (with the bit set to 1)
+   */
+  private def buildNonSelectAttrSet(
+      bitmask: Int,
+      attrs: Seq[Attribute]): AttributeSet = {
+    val nonSelect = new ArrayBuffer[Attribute]()
+
+    var bit = attrs.length - 1
+    while (bit >= 0) {
+      if (((bitmask >> bit) & 1) == 1) nonSelect += attrs(attrs.length - bit - 1)
+      bit -= 1
+    }
+
+    AttributeSet(nonSelect)
+  }
+
+  /**
+   * Apply the all of the GroupExpressions to every input row, hence we will get
+   * multiple output rows for a input row.
+   *
+   * @param bitmasks The bitmask set represents the grouping sets
+   * @param groupByAliases The aliased original group by expressions
+   * @param groupByAttrs The attributes of aliased group by expressions
+   * @param gid Attribute of the grouping id
+   * @param child Child operator
+   */
+  def apply(
+    bitmasks: Seq[Int],
+    groupByAliases: Seq[Alias],
+    groupByAttrs: Seq[Attribute],
+    gid: Attribute,
+    child: LogicalPlan): Expand = {
+    // Create an array of Projections for the child projection, and replace the projections'
+    // expressions which equal GroupBy expressions with Literal(null), if those expressions
+    // are not set for this grouping set (according to the bit mask).
+    val projections = bitmasks.map { bitmask =>
+      // get the non selected grouping attributes according to the bit mask
+      val nonSelectedGroupAttrSet = buildNonSelectAttrSet(bitmask, groupByAttrs)
+
+      child.output ++ groupByAttrs.map { attr =>
+        if (nonSelectedGroupAttrSet.contains(attr)) {
+          // if the input attribute in the Invalid Grouping Expression set of for this group
+          // replace it with constant null
+          Literal.create(null, attr.dataType)
+        } else {
+          attr
+        }
+      // groupingId is the last output, here we use the bit mask as the concrete value for it.
+      } :+ Literal.create(bitmask, IntegerType)
+    }
+
+    // the `groupByAttrs` has different meaning in `Expand.output`, it could be the original
+    // grouping expression or null, so here we create new instance of it.
+    val output = child.output ++ groupByAttrs.map(_.newInstance) :+ gid
+    Expand(projections, output, Project(child.output ++ groupByAliases, child))
+  }
+}
+
+/**
+ * Apply a number of projections to every input row, hence we will get multiple output rows for
+ * a input row.
+ *
+ * @param projections to apply
+ * @param output of all projections.
+ * @param child operator.
+ */
+case class Expand(
+    projections: Seq[Seq[Expression]],
+    output: Seq[Attribute],
+    child: LogicalPlan) extends UnaryNode {
+  override def references: AttributeSet =
+    AttributeSet(projections.flatten.flatMap(_.references))
+
+  override def statistics: Statistics = {
+    val sizeInBytes = super.statistics.sizeInBytes * projections.length
+    Statistics(sizeInBytes = sizeInBytes)
+  }
+
+  // This operator can reuse attributes (for example making them null when doing a roll up) so
+  // the contraints of the child may no longer be valid.
+  override protected def validConstraints: Set[Expression] = Set.empty[Expression]
+}
+
+/**
+ * A GROUP BY clause with GROUPING SETS can generate a result set equivalent
+ * to generated by a UNION ALL of multiple simple GROUP BY clauses.
+ *
+ * We will transform GROUPING SETS into logical plan Aggregate(.., Expand) in Analyzer
+ *
+ * @param bitmasks     A list of bitmasks, each of the bitmask indicates the selected
+ *                     GroupBy expressions
+ * @param groupByExprs The Group By expressions candidates, take effective only if the
+ *                     associated bit in the bitmask set to 1.
+ * @param child        Child operator
+ * @param aggregations The Aggregation expressions, those non selected group by expressions
+ *                     will be considered as constant null if it appears in the expressions
+ */
+case class GroupingSets(
+    bitmasks: Seq[Int],
+    groupByExprs: Seq[Expression],
+    child: LogicalPlan,
+    aggregations: Seq[NamedExpression]) extends UnaryNode {
+
+  override def output: Seq[Attribute] = aggregations.map(_.toAttribute)
+
+  // Needs to be unresolved before its translated to Aggregate + Expand because output attributes
+  // will change in analysis.
+  override lazy val resolved: Boolean = false
+}
+
+case class Pivot(
+    groupByExprs: Seq[NamedExpression],
+    pivotColumn: Expression,
+    pivotValues: Seq[Literal],
+    aggregates: Seq[Expression],
+    child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = groupByExprs.map(_.toAttribute) ++ aggregates match {
+    case agg :: Nil => pivotValues.map(value => AttributeReference(value.toString, agg.dataType)())
+    case _ => pivotValues.flatMap{ value =>
+      aggregates.map(agg => AttributeReference(value + "_" + agg.sql, agg.dataType)())
+    }
+  }
+}
+
+object Limit {
+  def apply(limitExpr: Expression, child: LogicalPlan): UnaryNode = {
+    GlobalLimit(limitExpr, LocalLimit(limitExpr, child))
+  }
+
+  def unapply(p: GlobalLimit): Option[(Expression, LogicalPlan)] = {
+    p match {
+      case GlobalLimit(le1, LocalLimit(le2, child)) if le1 == le2 => Some((le1, child))
+      case _ => None
+    }
+  }
+}
+
+case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def maxRows: Option[Long] = {
+    limitExpr match {
+      case IntegerLiteral(limit) => Some(limit)
+      case _ => None
+    }
+  }
+  override lazy val statistics: Statistics = {
+    val limit = limitExpr.eval().asInstanceOf[Int]
+    val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
+    Statistics(sizeInBytes = sizeInBytes)
+  }
+}
+
+case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+  override def maxRows: Option[Long] = {
+    limitExpr match {
+      case IntegerLiteral(limit) => Some(limit)
+      case _ => None
+    }
+  }
+  override lazy val statistics: Statistics = {
+    val limit = limitExpr.eval().asInstanceOf[Int]
+    val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
+    Statistics(sizeInBytes = sizeInBytes)
+  }
+}
+
+case class SubqueryAlias(alias: String, child: LogicalPlan) extends UnaryNode {
+
+  override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))
+}
+
+/**
+ * Sample the dataset.
+ *
+ * @param lowerBound Lower-bound of the sampling probability (usually 0.0)
+ * @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
+ *                   will be ub - lb.
+ * @param withReplacement Whether to sample with replacement.
+ * @param seed the random seed
+ * @param child the LogicalPlan
+ * @param isTableSample Is created from TABLESAMPLE in the parser.
+ */
+case class Sample(
+    lowerBound: Double,
+    upperBound: Double,
+    withReplacement: Boolean,
+    seed: Long,
+    child: LogicalPlan)(
+    val isTableSample: java.lang.Boolean = false) extends UnaryNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def statistics: Statistics = {
+    val ratio = upperBound - lowerBound
+    // BigInt can't multiply with Double
+    var sizeInBytes = child.statistics.sizeInBytes * (ratio * 100).toInt / 100
+    if (sizeInBytes == 0) {
+      sizeInBytes = 1
+    }
+    Statistics(sizeInBytes = sizeInBytes)
+  }
+
+  override protected def otherCopyArgs: Seq[AnyRef] = isTableSample :: Nil
+}
+
+/**
+ * Returns a new logical plan that dedups input rows.
+ */
+case class Distinct(child: LogicalPlan) extends UnaryNode {
+  override def maxRows: Option[Long] = child.maxRows
+  override def output: Seq[Attribute] = child.output
+}
+
+/**
+ * Returns a new RDD that has exactly `numPartitions` partitions. Differs from
+ * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
+ * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
+ * of the output requires some specific ordering or distribution of the data.
+ */
+case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
+  extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+}
+
+/**
+ * A relation with one row. This is used in "SELECT ..." without a from clause.
+ */
+case object OneRowRelation extends LeafNode {
+  override def maxRows: Option[Long] = Some(1)
+  override def output: Seq[Attribute] = Nil
+
+  /**
+   * Computes [[Statistics]] for this plan. The default implementation assumes the output
+   * cardinality is the product of of all child plan's cardinality, i.e. applies in the case
+   * of cartesian joins.
+   *
+   * [[LeafNode]]s must override this.
+   */
+  override def statistics: Statistics = Statistics(sizeInBytes = 1)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5c8a0ec9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
deleted file mode 100644
index a445ce6..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ /dev/null
@@ -1,709 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.plans.logical
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.types._
-
-/**
- * When planning take() or collect() operations, this special node that is inserted at the top of
- * the logical plan before invoking the query planner.
- *
- * Rules can pattern-match on this node in order to apply transformations that only take effect
- * at the top of the logical query plan.
- */
-case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-}
-
-case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
-  override def maxRows: Option[Long] = child.maxRows
-
-  override lazy val resolved: Boolean = {
-    val hasSpecialExpressions = projectList.exists ( _.collect {
-        case agg: AggregateExpression => agg
-        case generator: Generator => generator
-        case window: WindowExpression => window
-      }.nonEmpty
-    )
-
-    !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
-  }
-
-  override def validConstraints: Set[Expression] =
-    child.constraints.union(getAliasedConstraints(projectList))
-}
-
-/**
- * Applies a [[Generator]] to a stream of input rows, combining the
- * output of each into a new stream of rows.  This operation is similar to a `flatMap` in functional
- * programming with one important additional feature, which allows the input rows to be joined with
- * their output.
- *
- * @param generator the generator expression
- * @param join  when true, each output row is implicitly joined with the input tuple that produced
- *              it.
- * @param outer when true, each input row will be output at least once, even if the output of the
- *              given `generator` is empty. `outer` has no effect when `join` is false.
- * @param qualifier Qualifier for the attributes of generator(UDTF)
- * @param generatorOutput The output schema of the Generator.
- * @param child Children logical plan node
- */
-case class Generate(
-    generator: Generator,
-    join: Boolean,
-    outer: Boolean,
-    qualifier: Option[String],
-    generatorOutput: Seq[Attribute],
-    child: LogicalPlan)
-  extends UnaryNode {
-
-  /** The set of all attributes produced by this node. */
-  def generatedSet: AttributeSet = AttributeSet(generatorOutput)
-
-  override lazy val resolved: Boolean = {
-    generator.resolved &&
-      childrenResolved &&
-      generator.elementTypes.length == generatorOutput.length &&
-      generatorOutput.forall(_.resolved)
-  }
-
-  override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)
-
-  def output: Seq[Attribute] = {
-    val qualified = qualifier.map(q =>
-      // prepend the new qualifier to the existed one
-      generatorOutput.map(a => a.withQualifier(Some(q)))
-    ).getOrElse(generatorOutput)
-
-    if (join) child.output ++ qualified else qualified
-  }
-}
-
-case class Filter(condition: Expression, child: LogicalPlan)
-  extends UnaryNode with PredicateHelper {
-  override def output: Seq[Attribute] = child.output
-
-  override def maxRows: Option[Long] = child.maxRows
-
-  override protected def validConstraints: Set[Expression] =
-    child.constraints.union(splitConjunctivePredicates(condition).toSet)
-}
-
-abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
-
-  protected def leftConstraints: Set[Expression] = left.constraints
-
-  protected def rightConstraints: Set[Expression] = {
-    require(left.output.size == right.output.size)
-    val attributeRewrites = AttributeMap(right.output.zip(left.output))
-    right.constraints.map(_ transform {
-      case a: Attribute => attributeRewrites(a)
-    })
-  }
-}
-
-private[sql] object SetOperation {
-  def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right))
-}
-
-case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
-
-  def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
-
-  override def output: Seq[Attribute] =
-    left.output.zip(right.output).map { case (leftAttr, rightAttr) =>
-      leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
-    }
-
-  override protected def validConstraints: Set[Expression] =
-    leftConstraints.union(rightConstraints)
-
-  // Intersect are only resolved if they don't introduce ambiguous expression ids,
-  // since the Optimizer will convert Intersect to Join.
-  override lazy val resolved: Boolean =
-    childrenResolved &&
-      left.output.length == right.output.length &&
-      left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
-      duplicateResolved
-
-  override def maxRows: Option[Long] = {
-    if (children.exists(_.maxRows.isEmpty)) {
-      None
-    } else {
-      Some(children.flatMap(_.maxRows).min)
-    }
-  }
-
-  override def statistics: Statistics = {
-    val leftSize = left.statistics.sizeInBytes
-    val rightSize = right.statistics.sizeInBytes
-    val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize
-    Statistics(sizeInBytes = sizeInBytes)
-  }
-}
-
-case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
-  /** We don't use right.output because those rows get excluded from the set. */
-  override def output: Seq[Attribute] = left.output
-
-  override protected def validConstraints: Set[Expression] = leftConstraints
-
-  override lazy val resolved: Boolean =
-    childrenResolved &&
-      left.output.length == right.output.length &&
-      left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType }
-
-  override def statistics: Statistics = {
-    Statistics(sizeInBytes = left.statistics.sizeInBytes)
-  }
-}
-
-/** Factory for constructing new `Union` nodes. */
-object Union {
-  def apply(left: LogicalPlan, right: LogicalPlan): Union = {
-    Union (left :: right :: Nil)
-  }
-}
-
-case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
-  override def maxRows: Option[Long] = {
-    if (children.exists(_.maxRows.isEmpty)) {
-      None
-    } else {
-      Some(children.flatMap(_.maxRows).sum)
-    }
-  }
-
-  // updating nullability to make all the children consistent
-  override def output: Seq[Attribute] =
-    children.map(_.output).transpose.map(attrs =>
-      attrs.head.withNullability(attrs.exists(_.nullable)))
-
-  override lazy val resolved: Boolean = {
-    // allChildrenCompatible needs to be evaluated after childrenResolved
-    def allChildrenCompatible: Boolean =
-      children.tail.forall( child =>
-        // compare the attribute number with the first child
-        child.output.length == children.head.output.length &&
-        // compare the data types with the first child
-        child.output.zip(children.head.output).forall {
-          case (l, r) => l.dataType == r.dataType }
-      )
-
-    children.length > 1 && childrenResolved && allChildrenCompatible
-  }
-
-  override def statistics: Statistics = {
-    val sizeInBytes = children.map(_.statistics.sizeInBytes).sum
-    Statistics(sizeInBytes = sizeInBytes)
-  }
-
-  /**
-   * Maps the constraints containing a given (original) sequence of attributes to those with a
-   * given (reference) sequence of attributes. Given the nature of union, we expect that the
-   * mapping between the original and reference sequences are symmetric.
-   */
-  private def rewriteConstraints(
-      reference: Seq[Attribute],
-      original: Seq[Attribute],
-      constraints: Set[Expression]): Set[Expression] = {
-    require(reference.size == original.size)
-    val attributeRewrites = AttributeMap(original.zip(reference))
-    constraints.map(_ transform {
-      case a: Attribute => attributeRewrites(a)
-    })
-  }
-
-  private def merge(a: Set[Expression], b: Set[Expression]): Set[Expression] = {
-    val common = a.intersect(b)
-    // The constraint with only one reference could be easily inferred as predicate
-    // Grouping the constraints by it's references so we can combine the constraints with same
-    // reference together
-    val othera = a.diff(common).filter(_.references.size == 1).groupBy(_.references.head)
-    val otherb = b.diff(common).filter(_.references.size == 1).groupBy(_.references.head)
-    // loose the constraints by: A1 && B1 || A2 && B2  ->  (A1 || A2) && (B1 || B2)
-    val others = (othera.keySet intersect otherb.keySet).map { attr =>
-      Or(othera(attr).reduceLeft(And), otherb(attr).reduceLeft(And))
-    }
-    common ++ others
-  }
-
-  override protected def validConstraints: Set[Expression] = {
-    children
-      .map(child => rewriteConstraints(children.head.output, child.output, child.constraints))
-      .reduce(merge(_, _))
-  }
-}
-
-case class Join(
-    left: LogicalPlan,
-    right: LogicalPlan,
-    joinType: JoinType,
-    condition: Option[Expression])
-  extends BinaryNode with PredicateHelper {
-
-  override def output: Seq[Attribute] = {
-    joinType match {
-      case LeftExistence(_) =>
-        left.output
-      case LeftOuter =>
-        left.output ++ right.output.map(_.withNullability(true))
-      case RightOuter =>
-        left.output.map(_.withNullability(true)) ++ right.output
-      case FullOuter =>
-        left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
-      case _ =>
-        left.output ++ right.output
-    }
-  }
-
-  override protected def validConstraints: Set[Expression] = {
-    joinType match {
-      case Inner if condition.isDefined =>
-        left.constraints
-          .union(right.constraints)
-          .union(splitConjunctivePredicates(condition.get).toSet)
-      case LeftSemi if condition.isDefined =>
-        left.constraints
-          .union(splitConjunctivePredicates(condition.get).toSet)
-      case Inner =>
-        left.constraints.union(right.constraints)
-      case LeftExistence(_) =>
-        left.constraints
-      case LeftOuter =>
-        left.constraints
-      case RightOuter =>
-        right.constraints
-      case FullOuter =>
-        Set.empty[Expression]
-    }
-  }
-
-  def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
-
-  // Joins are only resolved if they don't introduce ambiguous expression ids.
-  // NaturalJoin should be ready for resolution only if everything else is resolved here
-  lazy val resolvedExceptNatural: Boolean = {
-    childrenResolved &&
-      expressions.forall(_.resolved) &&
-      duplicateResolved &&
-      condition.forall(_.dataType == BooleanType)
-  }
-
-  // if not a natural join, use `resolvedExceptNatural`. if it is a natural join or
-  // using join, we still need to eliminate natural or using before we mark it resolved.
-  override lazy val resolved: Boolean = joinType match {
-    case NaturalJoin(_) => false
-    case UsingJoin(_, _) => false
-    case _ => resolvedExceptNatural
-  }
-}
-
-/**
- * A hint for the optimizer that we should broadcast the `child` if used in a join operator.
- */
-case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-
-  // We manually set statistics of BroadcastHint to smallest value to make sure
-  // the plan wrapped by BroadcastHint will be considered to broadcast later.
-  override def statistics: Statistics = Statistics(sizeInBytes = 1)
-}
-
-case class InsertIntoTable(
-    table: LogicalPlan,
-    partition: Map[String, Option[String]],
-    child: LogicalPlan,
-    overwrite: Boolean,
-    ifNotExists: Boolean)
-  extends LogicalPlan {
-
-  override def children: Seq[LogicalPlan] = child :: Nil
-  override def output: Seq[Attribute] = Seq.empty
-
-  assert(overwrite || !ifNotExists)
-  override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall {
-    case (childAttr, tableAttr) =>
-      DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType)
-  }
-}
-
-/**
- * A container for holding named common table expressions (CTEs) and a query plan.
- * This operator will be removed during analysis and the relations will be substituted into child.
- *
- * @param child The final query of this CTE.
- * @param cteRelations Queries that this CTE defined,
- *                     key is the alias of the CTE definition,
- *                     value is the CTE definition.
- */
-case class With(child: LogicalPlan, cteRelations: Map[String, SubqueryAlias]) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-}
-
-case class WithWindowDefinition(
-    windowDefinitions: Map[String, WindowSpecDefinition],
-    child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-}
-
-/**
- * @param order  The ordering expressions
- * @param global True means global sorting apply for entire data set,
- *               False means sorting only apply within the partition.
- * @param child  Child logical plan
- */
-case class Sort(
-    order: Seq[SortOrder],
-    global: Boolean,
-    child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-  override def maxRows: Option[Long] = child.maxRows
-}
-
-/** Factory for constructing new `Range` nodes. */
-object Range {
-  def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = {
-    val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes
-    new Range(start, end, step, numSlices, output)
-  }
-}
-
-case class Range(
-    start: Long,
-    end: Long,
-    step: Long,
-    numSlices: Int,
-    output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
-  require(step != 0, "step cannot be 0")
-  val numElements: BigInt = {
-    val safeStart = BigInt(start)
-    val safeEnd = BigInt(end)
-    if ((safeEnd - safeStart) % step == 0 || (safeEnd > safeStart) != (step > 0)) {
-      (safeEnd - safeStart) / step
-    } else {
-      // the remainder has the same sign with range, could add 1 more
-      (safeEnd - safeStart) / step + 1
-    }
-  }
-
-  override def newInstance(): Range =
-    Range(start, end, step, numSlices, output.map(_.newInstance()))
-
-  override def statistics: Statistics = {
-    val sizeInBytes = LongType.defaultSize * numElements
-    Statistics( sizeInBytes = sizeInBytes )
-  }
-}
-
-case class Aggregate(
-    groupingExpressions: Seq[Expression],
-    aggregateExpressions: Seq[NamedExpression],
-    child: LogicalPlan)
-  extends UnaryNode {
-
-  override lazy val resolved: Boolean = {
-    val hasWindowExpressions = aggregateExpressions.exists ( _.collect {
-        case window: WindowExpression => window
-      }.nonEmpty
-    )
-
-    !expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions
-  }
-
-  override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
-  override def maxRows: Option[Long] = child.maxRows
-
-  override def validConstraints: Set[Expression] =
-    child.constraints.union(getAliasedConstraints(aggregateExpressions))
-
-  override def statistics: Statistics = {
-    if (groupingExpressions.isEmpty) {
-      Statistics(sizeInBytes = 1)
-    } else {
-      super.statistics
-    }
-  }
-}
-
-case class Window(
-    windowExpressions: Seq[NamedExpression],
-    partitionSpec: Seq[Expression],
-    orderSpec: Seq[SortOrder],
-    child: LogicalPlan) extends UnaryNode {
-
-  override def output: Seq[Attribute] =
-    child.output ++ windowExpressions.map(_.toAttribute)
-
-  def windowOutputSet: AttributeSet = AttributeSet(windowExpressions.map(_.toAttribute))
-}
-
-private[sql] object Expand {
-  /**
-   * Extract attribute set according to the grouping id.
-   *
-   * @param bitmask bitmask to represent the selected of the attribute sequence
-   * @param attrs the attributes in sequence
-   * @return the attributes of non selected specified via bitmask (with the bit set to 1)
-   */
-  private def buildNonSelectAttrSet(
-      bitmask: Int,
-      attrs: Seq[Attribute]): AttributeSet = {
-    val nonSelect = new ArrayBuffer[Attribute]()
-
-    var bit = attrs.length - 1
-    while (bit >= 0) {
-      if (((bitmask >> bit) & 1) == 1) nonSelect += attrs(attrs.length - bit - 1)
-      bit -= 1
-    }
-
-    AttributeSet(nonSelect)
-  }
-
-  /**
-   * Apply the all of the GroupExpressions to every input row, hence we will get
-   * multiple output rows for a input row.
-   *
-   * @param bitmasks The bitmask set represents the grouping sets
-   * @param groupByAliases The aliased original group by expressions
-   * @param groupByAttrs The attributes of aliased group by expressions
-   * @param gid Attribute of the grouping id
-   * @param child Child operator
-   */
-  def apply(
-    bitmasks: Seq[Int],
-    groupByAliases: Seq[Alias],
-    groupByAttrs: Seq[Attribute],
-    gid: Attribute,
-    child: LogicalPlan): Expand = {
-    // Create an array of Projections for the child projection, and replace the projections'
-    // expressions which equal GroupBy expressions with Literal(null), if those expressions
-    // are not set for this grouping set (according to the bit mask).
-    val projections = bitmasks.map { bitmask =>
-      // get the non selected grouping attributes according to the bit mask
-      val nonSelectedGroupAttrSet = buildNonSelectAttrSet(bitmask, groupByAttrs)
-
-      child.output ++ groupByAttrs.map { attr =>
-        if (nonSelectedGroupAttrSet.contains(attr)) {
-          // if the input attribute in the Invalid Grouping Expression set of for this group
-          // replace it with constant null
-          Literal.create(null, attr.dataType)
-        } else {
-          attr
-        }
-      // groupingId is the last output, here we use the bit mask as the concrete value for it.
-      } :+ Literal.create(bitmask, IntegerType)
-    }
-
-    // the `groupByAttrs` has different meaning in `Expand.output`, it could be the original
-    // grouping expression or null, so here we create new instance of it.
-    val output = child.output ++ groupByAttrs.map(_.newInstance) :+ gid
-    Expand(projections, output, Project(child.output ++ groupByAliases, child))
-  }
-}
-
-/**
- * Apply a number of projections to every input row, hence we will get multiple output rows for
- * a input row.
- *
- * @param projections to apply
- * @param output of all projections.
- * @param child operator.
- */
-case class Expand(
-    projections: Seq[Seq[Expression]],
-    output: Seq[Attribute],
-    child: LogicalPlan) extends UnaryNode {
-  override def references: AttributeSet =
-    AttributeSet(projections.flatten.flatMap(_.references))
-
-  override def statistics: Statistics = {
-    val sizeInBytes = super.statistics.sizeInBytes * projections.length
-    Statistics(sizeInBytes = sizeInBytes)
-  }
-
-  // This operator can reuse attributes (for example making them null when doing a roll up) so
-  // the contraints of the child may no longer be valid.
-  override protected def validConstraints: Set[Expression] = Set.empty[Expression]
-}
-
-/**
- * A GROUP BY clause with GROUPING SETS can generate a result set equivalent
- * to generated by a UNION ALL of multiple simple GROUP BY clauses.
- *
- * We will transform GROUPING SETS into logical plan Aggregate(.., Expand) in Analyzer
- *
- * @param bitmasks     A list of bitmasks, each of the bitmask indicates the selected
- *                     GroupBy expressions
- * @param groupByExprs The Group By expressions candidates, take effective only if the
- *                     associated bit in the bitmask set to 1.
- * @param child        Child operator
- * @param aggregations The Aggregation expressions, those non selected group by expressions
- *                     will be considered as constant null if it appears in the expressions
- */
-case class GroupingSets(
-    bitmasks: Seq[Int],
-    groupByExprs: Seq[Expression],
-    child: LogicalPlan,
-    aggregations: Seq[NamedExpression]) extends UnaryNode {
-
-  override def output: Seq[Attribute] = aggregations.map(_.toAttribute)
-
-  // Needs to be unresolved before its translated to Aggregate + Expand because output attributes
-  // will change in analysis.
-  override lazy val resolved: Boolean = false
-}
-
-case class Pivot(
-    groupByExprs: Seq[NamedExpression],
-    pivotColumn: Expression,
-    pivotValues: Seq[Literal],
-    aggregates: Seq[Expression],
-    child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = groupByExprs.map(_.toAttribute) ++ aggregates match {
-    case agg :: Nil => pivotValues.map(value => AttributeReference(value.toString, agg.dataType)())
-    case _ => pivotValues.flatMap{ value =>
-      aggregates.map(agg => AttributeReference(value + "_" + agg.sql, agg.dataType)())
-    }
-  }
-}
-
-object Limit {
-  def apply(limitExpr: Expression, child: LogicalPlan): UnaryNode = {
-    GlobalLimit(limitExpr, LocalLimit(limitExpr, child))
-  }
-
-  def unapply(p: GlobalLimit): Option[(Expression, LogicalPlan)] = {
-    p match {
-      case GlobalLimit(le1, LocalLimit(le2, child)) if le1 == le2 => Some((le1, child))
-      case _ => None
-    }
-  }
-}
-
-case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-  override def maxRows: Option[Long] = {
-    limitExpr match {
-      case IntegerLiteral(limit) => Some(limit)
-      case _ => None
-    }
-  }
-  override lazy val statistics: Statistics = {
-    val limit = limitExpr.eval().asInstanceOf[Int]
-    val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
-    Statistics(sizeInBytes = sizeInBytes)
-  }
-}
-
-case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-  override def maxRows: Option[Long] = {
-    limitExpr match {
-      case IntegerLiteral(limit) => Some(limit)
-      case _ => None
-    }
-  }
-  override lazy val statistics: Statistics = {
-    val limit = limitExpr.eval().asInstanceOf[Int]
-    val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
-    Statistics(sizeInBytes = sizeInBytes)
-  }
-}
-
-case class SubqueryAlias(alias: String, child: LogicalPlan) extends UnaryNode {
-
-  override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))
-}
-
-/**
- * Sample the dataset.
- *
- * @param lowerBound Lower-bound of the sampling probability (usually 0.0)
- * @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
- *                   will be ub - lb.
- * @param withReplacement Whether to sample with replacement.
- * @param seed the random seed
- * @param child the LogicalPlan
- * @param isTableSample Is created from TABLESAMPLE in the parser.
- */
-case class Sample(
-    lowerBound: Double,
-    upperBound: Double,
-    withReplacement: Boolean,
-    seed: Long,
-    child: LogicalPlan)(
-    val isTableSample: java.lang.Boolean = false) extends UnaryNode {
-
-  override def output: Seq[Attribute] = child.output
-
-  override def statistics: Statistics = {
-    val ratio = upperBound - lowerBound
-    // BigInt can't multiply with Double
-    var sizeInBytes = child.statistics.sizeInBytes * (ratio * 100).toInt / 100
-    if (sizeInBytes == 0) {
-      sizeInBytes = 1
-    }
-    Statistics(sizeInBytes = sizeInBytes)
-  }
-
-  override protected def otherCopyArgs: Seq[AnyRef] = isTableSample :: Nil
-}
-
-/**
- * Returns a new logical plan that dedups input rows.
- */
-case class Distinct(child: LogicalPlan) extends UnaryNode {
-  override def maxRows: Option[Long] = child.maxRows
-  override def output: Seq[Attribute] = child.output
-}
-
-/**
- * Returns a new RDD that has exactly `numPartitions` partitions. Differs from
- * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
- * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
- * of the output requires some specific ordering or distribution of the data.
- */
-case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
-  extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-}
-
-/**
- * A relation with one row. This is used in "SELECT ..." without a from clause.
- */
-case object OneRowRelation extends LeafNode {
-  override def maxRows: Option[Long] = Some(1)
-  override def output: Seq[Attribute] = Nil
-
-  /**
-   * Computes [[Statistics]] for this plan. The default implementation assumes the output
-   * cardinality is the product of of all child plan's cardinality, i.e. applies in the case
-   * of cartesian joins.
-   *
-   * [[LeafNode]]s must override this.
-   */
-  override def statistics: Statistics = Statistics(sizeInBytes = 1)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/5c8a0ec9/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
deleted file mode 100644
index 83f527f..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ /dev/null
@@ -1,530 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.LongType
-import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
-
-/** Physical plan for Project. */
-case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
-  extends UnaryExecNode with CodegenSupport {
-
-  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    child.asInstanceOf[CodegenSupport].inputRDDs()
-  }
-
-  protected override def doProduce(ctx: CodegenContext): String = {
-    child.asInstanceOf[CodegenSupport].produce(ctx, this)
-  }
-
-  override def usedInputs: AttributeSet = {
-    // only the attributes those are used at least twice should be evaluated before this plan,
-    // otherwise we could defer the evaluation until output attribute is actually used.
-    val usedExprIds = projectList.flatMap(_.collect {
-      case a: Attribute => a.exprId
-    })
-    val usedMoreThanOnce = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
-    references.filter(a => usedMoreThanOnce.contains(a.exprId))
-  }
-
-  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
-    val exprs = projectList.map(x =>
-      ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output)))
-    ctx.currentVars = input
-    val resultVars = exprs.map(_.genCode(ctx))
-    // Evaluation of non-deterministic expressions can't be deferred.
-    val nonDeterministicAttrs = projectList.filterNot(_.deterministic).map(_.toAttribute)
-    s"""
-       |${evaluateRequiredVariables(output, resultVars, AttributeSet(nonDeterministicAttrs))}
-       |${consume(ctx, resultVars)}
-     """.stripMargin
-  }
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    child.execute().mapPartitionsInternal { iter =>
-      val project = UnsafeProjection.create(projectList, child.output,
-        subexpressionEliminationEnabled)
-      iter.map(project)
-    }
-  }
-
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-}
-
-
-/** Physical plan for Filter. */
-case class FilterExec(condition: Expression, child: SparkPlan)
-  extends UnaryExecNode with CodegenSupport with PredicateHelper {
-
-  // Split out all the IsNotNulls from condition.
-  private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition {
-    case IsNotNull(a: NullIntolerant) if a.references.subsetOf(child.outputSet) => true
-    case _ => false
-  }
-
-  // The columns that will filtered out by `IsNotNull` could be considered as not nullable.
-  private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId)
-
-  // Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate
-  // all the variables at the beginning to take advantage of short circuiting.
-  override def usedInputs: AttributeSet = AttributeSet.empty
-
-  override def output: Seq[Attribute] = {
-    child.output.map { a =>
-      if (a.nullable && notNullAttributes.contains(a.exprId)) {
-        a.withNullability(false)
-      } else {
-        a
-      }
-    }
-  }
-
-  private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    child.asInstanceOf[CodegenSupport].inputRDDs()
-  }
-
-  protected override def doProduce(ctx: CodegenContext): String = {
-    child.asInstanceOf[CodegenSupport].produce(ctx, this)
-  }
-
-  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
-    val numOutput = metricTerm(ctx, "numOutputRows")
-
-    /**
-     * Generates code for `c`, using `in` for input attributes and `attrs` for nullability.
-     */
-    def genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String = {
-      val bound = BindReferences.bindReference(c, attrs)
-      val evaluated = evaluateRequiredVariables(child.output, in, c.references)
-
-      // Generate the code for the predicate.
-      val ev = ExpressionCanonicalizer.execute(bound).genCode(ctx)
-      val nullCheck = if (bound.nullable) {
-        s"${ev.isNull} || "
-      } else {
-        s""
-      }
-
-      s"""
-         |$evaluated
-         |${ev.code}
-         |if (${nullCheck}!${ev.value}) continue;
-       """.stripMargin
-    }
-
-    ctx.currentVars = input
-
-    // To generate the predicates we will follow this algorithm.
-    // For each predicate that is not IsNotNull, we will generate them one by one loading attributes
-    // as necessary. For each of both attributes, if there is a IsNotNull predicate we will generate
-    // that check *before* the predicate. After all of these predicates, we will generate the
-    // remaining IsNotNull checks that were not part of other predicates.
-    // This has the property of not doing redundant IsNotNull checks and taking better advantage of
-    // short-circuiting, not loading attributes until they are needed.
-    // This is very perf sensitive.
-    // TODO: revisit this. We can consider reordering predicates as well.
-    val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length)
-    val generated = otherPreds.map { c =>
-      val nullChecks = c.references.map { r =>
-        val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)}
-        if (idx != -1 && !generatedIsNotNullChecks(idx)) {
-          generatedIsNotNullChecks(idx) = true
-          // Use the child's output. The nullability is what the child produced.
-          genPredicate(notNullPreds(idx), input, child.output)
-        } else {
-          ""
-        }
-      }.mkString("\n").trim
-
-      // Here we use *this* operator's output with this output's nullability since we already
-      // enforced them with the IsNotNull checks above.
-      s"""
-         |$nullChecks
-         |${genPredicate(c, input, output)}
-       """.stripMargin.trim
-    }.mkString("\n")
-
-    val nullChecks = notNullPreds.zipWithIndex.map { case (c, idx) =>
-      if (!generatedIsNotNullChecks(idx)) {
-        genPredicate(c, input, child.output)
-      } else {
-        ""
-      }
-    }.mkString("\n")
-
-    // Reset the isNull to false for the not-null columns, then the followed operators could
-    // generate better code (remove dead branches).
-    val resultVars = input.zipWithIndex.map { case (ev, i) =>
-      if (notNullAttributes.contains(child.output(i).exprId)) {
-        ev.isNull = "false"
-      }
-      ev
-    }
-
-    s"""
-       |$generated
-       |$nullChecks
-       |$numOutput.add(1);
-       |${consume(ctx, resultVars)}
-     """.stripMargin
-  }
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    val numOutputRows = longMetric("numOutputRows")
-    child.execute().mapPartitionsInternal { iter =>
-      val predicate = newPredicate(condition, child.output)
-      iter.filter { row =>
-        val r = predicate(row)
-        if (r) numOutputRows += 1
-        r
-      }
-    }
-  }
-
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-}
-
-/**
- * Physical plan for sampling the dataset.
- *
- * @param lowerBound Lower-bound of the sampling probability (usually 0.0)
- * @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
- *                   will be ub - lb.
- * @param withReplacement Whether to sample with replacement.
- * @param seed the random seed
- * @param child the SparkPlan
- */
-case class SampleExec(
-    lowerBound: Double,
-    upperBound: Double,
-    withReplacement: Boolean,
-    seed: Long,
-    child: SparkPlan) extends UnaryExecNode with CodegenSupport {
-  override def output: Seq[Attribute] = child.output
-
-  private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    if (withReplacement) {
-      // Disable gap sampling since the gap sampling method buffers two rows internally,
-      // requiring us to copy the row, which is more expensive than the random number generator.
-      new PartitionwiseSampledRDD[InternalRow, InternalRow](
-        child.execute(),
-        new PoissonSampler[InternalRow](upperBound - lowerBound, useGapSamplingIfPossible = false),
-        preservesPartitioning = true,
-        seed)
-    } else {
-      child.execute().randomSampleWithRange(lowerBound, upperBound, seed)
-    }
-  }
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    child.asInstanceOf[CodegenSupport].inputRDDs()
-  }
-
-  protected override def doProduce(ctx: CodegenContext): String = {
-    child.asInstanceOf[CodegenSupport].produce(ctx, this)
-  }
-
-  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
-    val numOutput = metricTerm(ctx, "numOutputRows")
-    val sampler = ctx.freshName("sampler")
-
-    if (withReplacement) {
-      val samplerClass = classOf[PoissonSampler[UnsafeRow]].getName
-      val initSampler = ctx.freshName("initSampler")
-      ctx.addMutableState(s"$samplerClass<UnsafeRow>", sampler,
-        s"$initSampler();")
-
-      ctx.addNewFunction(initSampler,
-        s"""
-          | private void $initSampler() {
-          |   $sampler = new $samplerClass<UnsafeRow>($upperBound - $lowerBound, false);
-          |   java.util.Random random = new java.util.Random(${seed}L);
-          |   long randomSeed = random.nextLong();
-          |   int loopCount = 0;
-          |   while (loopCount < partitionIndex) {
-          |     randomSeed = random.nextLong();
-          |     loopCount += 1;
-          |   }
-          |   $sampler.setSeed(randomSeed);
-          | }
-         """.stripMargin.trim)
-
-      val samplingCount = ctx.freshName("samplingCount")
-      s"""
-         | int $samplingCount = $sampler.sample();
-         | while ($samplingCount-- > 0) {
-         |   $numOutput.add(1);
-         |   ${consume(ctx, input)}
-         | }
-       """.stripMargin.trim
-    } else {
-      val samplerClass = classOf[BernoulliCellSampler[UnsafeRow]].getName
-      ctx.addMutableState(s"$samplerClass<UnsafeRow>", sampler,
-        s"""
-          | $sampler = new $samplerClass<UnsafeRow>($lowerBound, $upperBound, false);
-          | $sampler.setSeed(${seed}L + partitionIndex);
-         """.stripMargin.trim)
-
-      s"""
-         | if ($sampler.sample() == 0) continue;
-         | $numOutput.add(1);
-         | ${consume(ctx, input)}
-       """.stripMargin.trim
-    }
-  }
-}
-
-
-/**
- * Physical plan for range (generating a range of 64 bit numbers.
- *
- * @param start first number in the range, inclusive.
- * @param step size of the step increment.
- * @param numSlices number of partitions.
- * @param numElements total number of elements to output.
- * @param output output attributes.
- */
-case class RangeExec(
-    start: Long,
-    step: Long,
-    numSlices: Int,
-    numElements: BigInt,
-    output: Seq[Attribute])
-  extends LeafExecNode with CodegenSupport {
-
-  private[sql] override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
-
-  // output attributes should not affect the results
-  override lazy val cleanArgs: Seq[Any] = Seq(start, step, numSlices, numElements)
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    sqlContext.sparkContext.parallelize(0 until numSlices, numSlices)
-      .map(i => InternalRow(i)) :: Nil
-  }
-
-  protected override def doProduce(ctx: CodegenContext): String = {
-    val numOutput = metricTerm(ctx, "numOutputRows")
-
-    val initTerm = ctx.freshName("initRange")
-    ctx.addMutableState("boolean", initTerm, s"$initTerm = false;")
-    val partitionEnd = ctx.freshName("partitionEnd")
-    ctx.addMutableState("long", partitionEnd, s"$partitionEnd = 0L;")
-    val number = ctx.freshName("number")
-    ctx.addMutableState("long", number, s"$number = 0L;")
-    val overflow = ctx.freshName("overflow")
-    ctx.addMutableState("boolean", overflow, s"$overflow = false;")
-
-    val value = ctx.freshName("value")
-    val ev = ExprCode("", "false", value)
-    val BigInt = classOf[java.math.BigInteger].getName
-    val checkEnd = if (step > 0) {
-      s"$number < $partitionEnd"
-    } else {
-      s"$number > $partitionEnd"
-    }
-
-    ctx.addNewFunction("initRange",
-      s"""
-        | private void initRange(int idx) {
-        |   $BigInt index = $BigInt.valueOf(idx);
-        |   $BigInt numSlice = $BigInt.valueOf(${numSlices}L);
-        |   $BigInt numElement = $BigInt.valueOf(${numElements.toLong}L);
-        |   $BigInt step = $BigInt.valueOf(${step}L);
-        |   $BigInt start = $BigInt.valueOf(${start}L);
-        |
-        |   $BigInt st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
-        |   if (st.compareTo($BigInt.valueOf(Long.MAX_VALUE)) > 0) {
-        |     $number = Long.MAX_VALUE;
-        |   } else if (st.compareTo($BigInt.valueOf(Long.MIN_VALUE)) < 0) {
-        |     $number = Long.MIN_VALUE;
-        |   } else {
-        |     $number = st.longValue();
-        |   }
-        |
-        |   $BigInt end = index.add($BigInt.ONE).multiply(numElement).divide(numSlice)
-        |     .multiply(step).add(start);
-        |   if (end.compareTo($BigInt.valueOf(Long.MAX_VALUE)) > 0) {
-        |     $partitionEnd = Long.MAX_VALUE;
-        |   } else if (end.compareTo($BigInt.valueOf(Long.MIN_VALUE)) < 0) {
-        |     $partitionEnd = Long.MIN_VALUE;
-        |   } else {
-        |     $partitionEnd = end.longValue();
-        |   }
-        |
-        |   $numOutput.add(($partitionEnd - $number) / ${step}L);
-        | }
-       """.stripMargin)
-
-    val input = ctx.freshName("input")
-    // Right now, Range is only used when there is one upstream.
-    ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
-    s"""
-      | // initialize Range
-      | if (!$initTerm) {
-      |   $initTerm = true;
-      |   initRange(partitionIndex);
-      | }
-      |
-      | while (!$overflow && $checkEnd) {
-      |  long $value = $number;
-      |  $number += ${step}L;
-      |  if ($number < $value ^ ${step}L < 0) {
-      |    $overflow = true;
-      |  }
-      |  ${consume(ctx, Seq(ev))}
-      |  if (shouldStop()) return;
-      | }
-     """.stripMargin
-  }
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    val numOutputRows = longMetric("numOutputRows")
-    sqlContext
-      .sparkContext
-      .parallelize(0 until numSlices, numSlices)
-      .mapPartitionsWithIndex { (i, _) =>
-        val partitionStart = (i * numElements) / numSlices * step + start
-        val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
-        def getSafeMargin(bi: BigInt): Long =
-          if (bi.isValidLong) {
-            bi.toLong
-          } else if (bi > 0) {
-            Long.MaxValue
-          } else {
-            Long.MinValue
-          }
-        val safePartitionStart = getSafeMargin(partitionStart)
-        val safePartitionEnd = getSafeMargin(partitionEnd)
-        val rowSize = UnsafeRow.calculateBitSetWidthInBytes(1) + LongType.defaultSize
-        val unsafeRow = UnsafeRow.createFromByteArray(rowSize, 1)
-
-        new Iterator[InternalRow] {
-          private[this] var number: Long = safePartitionStart
-          private[this] var overflow: Boolean = false
-
-          override def hasNext =
-            if (!overflow) {
-              if (step > 0) {
-                number < safePartitionEnd
-              } else {
-                number > safePartitionEnd
-              }
-            } else false
-
-          override def next() = {
-            val ret = number
-            number += step
-            if (number < ret ^ step < 0) {
-              // we have Long.MaxValue + Long.MaxValue < Long.MaxValue
-              // and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step
-              // back, we are pretty sure that we have an overflow.
-              overflow = true
-            }
-
-            numOutputRows += 1
-            unsafeRow.setLong(0, ret)
-            unsafeRow
-          }
-        }
-      }
-  }
-}
-
-/**
- * Physical plan for unioning two plans, without a distinct. This is UNION ALL in SQL.
- */
-case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
-  override def output: Seq[Attribute] =
-    children.map(_.output).transpose.map(attrs =>
-      attrs.head.withNullability(attrs.exists(_.nullable)))
-
-  protected override def doExecute(): RDD[InternalRow] =
-    sparkContext.union(children.map(_.execute()))
-}
-
-/**
- * Physical plan for returning a new RDD that has exactly `numPartitions` partitions.
- * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
- * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
- * the 100 new partitions will claim 10 of the current partitions.
- */
-case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode {
-  override def output: Seq[Attribute] = child.output
-
-  override def outputPartitioning: Partitioning = {
-    if (numPartitions == 1) SinglePartition
-    else UnknownPartitioning(numPartitions)
-  }
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    child.execute().coalesce(numPartitions, shuffle = false)
-  }
-}
-
-/**
- * Physical plan for returning a table with the elements from left that are not in right using
- * the built-in spark subtract function.
- */
-case class ExceptExec(left: SparkPlan, right: SparkPlan) extends BinaryExecNode {
-  override def output: Seq[Attribute] = left.output
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
-  }
-}
-
-/**
- * A plan node that does nothing but lie about the output of its child.  Used to spice a
- * (hopefully structurally equivalent) tree from a different optimization sequence into an already
- * resolved tree.
- */
-case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
-  def children: Seq[SparkPlan] = child :: Nil
-
-  protected override def doExecute(): RDD[InternalRow] = child.execute()
-}
-
-/**
- * Physical plan for a subquery.
- *
- * This is used to generate tree string for SparkScalarSubquery.
- */
-case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
-  override def output: Seq[Attribute] = child.output
-  override def outputPartitioning: Partitioning = child.outputPartitioning
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    throw new UnsupportedOperationException
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org