You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/04/23 21:44:08 UTC
[2/2] spark git commit: [SPARK-14872][SQL] Restructure command package
[SPARK-14872][SQL] Restructure command package
## What changes were proposed in this pull request?
This patch restructures sql.execution.command package to break the commands into multiple files, in some logical organization: databases, tables, views, functions.
I also renamed basicOperators.scala to basicLogicalOperators.scala and basicPhysicalOperators.scala.
## How was this patch tested?
N/A - all I did was moving code around.
Author: Reynold Xin <rx...@databricks.com>
Closes #12636 from rxin/SPARK-14872.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c8a0ec9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c8a0ec9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c8a0ec9
Branch: refs/heads/master
Commit: 5c8a0ec99bded2271481f8d6cf5443fea5da4bbd
Parents: fddd3ae
Author: Reynold Xin <rx...@databricks.com>
Authored: Sat Apr 23 12:44:00 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Sat Apr 23 12:44:00 2016 -0700
----------------------------------------------------------------------
.../plans/logical/basicLogicalOperators.scala | 709 +++++++++++++++++++
.../catalyst/plans/logical/basicOperators.scala | 709 -------------------
.../spark/sql/execution/basicOperators.scala | 530 --------------
.../sql/execution/basicPhysicalOperators.scala | 530 ++++++++++++++
.../spark/sql/execution/command/cache.scala | 70 ++
.../spark/sql/execution/command/commands.scala | 264 +------
.../spark/sql/execution/command/databases.scala | 64 ++
.../spark/sql/execution/command/functions.scala | 99 ++-
.../spark/sql/execution/command/tables.scala | 77 +-
9 files changed, 1556 insertions(+), 1496 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5c8a0ec9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
new file mode 100644
index 0000000..a445ce6
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -0,0 +1,709 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.types._
+
+/**
+ * When planning take() or collect() operations, this special node that is inserted at the top of
+ * the logical plan before invoking the query planner.
+ *
+ * Rules can pattern-match on this node in order to apply transformations that only take effect
+ * at the top of the logical query plan.
+ */
+case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+}
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+ override def maxRows: Option[Long] = child.maxRows
+
+ override lazy val resolved: Boolean = {
+ val hasSpecialExpressions = projectList.exists ( _.collect {
+ case agg: AggregateExpression => agg
+ case generator: Generator => generator
+ case window: WindowExpression => window
+ }.nonEmpty
+ )
+
+ !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
+ }
+
+ override def validConstraints: Set[Expression] =
+ child.constraints.union(getAliasedConstraints(projectList))
+}
+
+/**
+ * Applies a [[Generator]] to a stream of input rows, combining the
+ * output of each into a new stream of rows. This operation is similar to a `flatMap` in functional
+ * programming with one important additional feature, which allows the input rows to be joined with
+ * their output.
+ *
+ * @param generator the generator expression
+ * @param join when true, each output row is implicitly joined with the input tuple that produced
+ * it.
+ * @param outer when true, each input row will be output at least once, even if the output of the
+ * given `generator` is empty. `outer` has no effect when `join` is false.
+ * @param qualifier Qualifier for the attributes of generator(UDTF)
+ * @param generatorOutput The output schema of the Generator.
+ * @param child Children logical plan node
+ */
+case class Generate(
+ generator: Generator,
+ join: Boolean,
+ outer: Boolean,
+ qualifier: Option[String],
+ generatorOutput: Seq[Attribute],
+ child: LogicalPlan)
+ extends UnaryNode {
+
+ /** The set of all attributes produced by this node. */
+ def generatedSet: AttributeSet = AttributeSet(generatorOutput)
+
+ override lazy val resolved: Boolean = {
+ generator.resolved &&
+ childrenResolved &&
+ generator.elementTypes.length == generatorOutput.length &&
+ generatorOutput.forall(_.resolved)
+ }
+
+ override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)
+
+ def output: Seq[Attribute] = {
+ val qualified = qualifier.map(q =>
+ // prepend the new qualifier to the existed one
+ generatorOutput.map(a => a.withQualifier(Some(q)))
+ ).getOrElse(generatorOutput)
+
+ if (join) child.output ++ qualified else qualified
+ }
+}
+
+case class Filter(condition: Expression, child: LogicalPlan)
+ extends UnaryNode with PredicateHelper {
+ override def output: Seq[Attribute] = child.output
+
+ override def maxRows: Option[Long] = child.maxRows
+
+ override protected def validConstraints: Set[Expression] =
+ child.constraints.union(splitConjunctivePredicates(condition).toSet)
+}
+
+abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
+
+ protected def leftConstraints: Set[Expression] = left.constraints
+
+ protected def rightConstraints: Set[Expression] = {
+ require(left.output.size == right.output.size)
+ val attributeRewrites = AttributeMap(right.output.zip(left.output))
+ right.constraints.map(_ transform {
+ case a: Attribute => attributeRewrites(a)
+ })
+ }
+}
+
+private[sql] object SetOperation {
+ def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right))
+}
+
+case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
+
+ def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
+
+ override def output: Seq[Attribute] =
+ left.output.zip(right.output).map { case (leftAttr, rightAttr) =>
+ leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
+ }
+
+ override protected def validConstraints: Set[Expression] =
+ leftConstraints.union(rightConstraints)
+
+ // Intersect are only resolved if they don't introduce ambiguous expression ids,
+ // since the Optimizer will convert Intersect to Join.
+ override lazy val resolved: Boolean =
+ childrenResolved &&
+ left.output.length == right.output.length &&
+ left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
+ duplicateResolved
+
+ override def maxRows: Option[Long] = {
+ if (children.exists(_.maxRows.isEmpty)) {
+ None
+ } else {
+ Some(children.flatMap(_.maxRows).min)
+ }
+ }
+
+ override def statistics: Statistics = {
+ val leftSize = left.statistics.sizeInBytes
+ val rightSize = right.statistics.sizeInBytes
+ val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize
+ Statistics(sizeInBytes = sizeInBytes)
+ }
+}
+
+case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
+ /** We don't use right.output because those rows get excluded from the set. */
+ override def output: Seq[Attribute] = left.output
+
+ override protected def validConstraints: Set[Expression] = leftConstraints
+
+ override lazy val resolved: Boolean =
+ childrenResolved &&
+ left.output.length == right.output.length &&
+ left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType }
+
+ override def statistics: Statistics = {
+ Statistics(sizeInBytes = left.statistics.sizeInBytes)
+ }
+}
+
+/** Factory for constructing new `Union` nodes. */
+object Union {
+ def apply(left: LogicalPlan, right: LogicalPlan): Union = {
+ Union (left :: right :: Nil)
+ }
+}
+
+case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
+ override def maxRows: Option[Long] = {
+ if (children.exists(_.maxRows.isEmpty)) {
+ None
+ } else {
+ Some(children.flatMap(_.maxRows).sum)
+ }
+ }
+
+ // updating nullability to make all the children consistent
+ override def output: Seq[Attribute] =
+ children.map(_.output).transpose.map(attrs =>
+ attrs.head.withNullability(attrs.exists(_.nullable)))
+
+ override lazy val resolved: Boolean = {
+ // allChildrenCompatible needs to be evaluated after childrenResolved
+ def allChildrenCompatible: Boolean =
+ children.tail.forall( child =>
+ // compare the attribute number with the first child
+ child.output.length == children.head.output.length &&
+ // compare the data types with the first child
+ child.output.zip(children.head.output).forall {
+ case (l, r) => l.dataType == r.dataType }
+ )
+
+ children.length > 1 && childrenResolved && allChildrenCompatible
+ }
+
+ override def statistics: Statistics = {
+ val sizeInBytes = children.map(_.statistics.sizeInBytes).sum
+ Statistics(sizeInBytes = sizeInBytes)
+ }
+
+ /**
+ * Maps the constraints containing a given (original) sequence of attributes to those with a
+ * given (reference) sequence of attributes. Given the nature of union, we expect that the
+ * mapping between the original and reference sequences are symmetric.
+ */
+ private def rewriteConstraints(
+ reference: Seq[Attribute],
+ original: Seq[Attribute],
+ constraints: Set[Expression]): Set[Expression] = {
+ require(reference.size == original.size)
+ val attributeRewrites = AttributeMap(original.zip(reference))
+ constraints.map(_ transform {
+ case a: Attribute => attributeRewrites(a)
+ })
+ }
+
+ private def merge(a: Set[Expression], b: Set[Expression]): Set[Expression] = {
+ val common = a.intersect(b)
+ // The constraint with only one reference could be easily inferred as predicate
+ // Grouping the constraints by it's references so we can combine the constraints with same
+ // reference together
+ val othera = a.diff(common).filter(_.references.size == 1).groupBy(_.references.head)
+ val otherb = b.diff(common).filter(_.references.size == 1).groupBy(_.references.head)
+ // loose the constraints by: A1 && B1 || A2 && B2 -> (A1 || A2) && (B1 || B2)
+ val others = (othera.keySet intersect otherb.keySet).map { attr =>
+ Or(othera(attr).reduceLeft(And), otherb(attr).reduceLeft(And))
+ }
+ common ++ others
+ }
+
+ override protected def validConstraints: Set[Expression] = {
+ children
+ .map(child => rewriteConstraints(children.head.output, child.output, child.constraints))
+ .reduce(merge(_, _))
+ }
+}
+
+case class Join(
+ left: LogicalPlan,
+ right: LogicalPlan,
+ joinType: JoinType,
+ condition: Option[Expression])
+ extends BinaryNode with PredicateHelper {
+
+ override def output: Seq[Attribute] = {
+ joinType match {
+ case LeftExistence(_) =>
+ left.output
+ case LeftOuter =>
+ left.output ++ right.output.map(_.withNullability(true))
+ case RightOuter =>
+ left.output.map(_.withNullability(true)) ++ right.output
+ case FullOuter =>
+ left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
+ case _ =>
+ left.output ++ right.output
+ }
+ }
+
+ override protected def validConstraints: Set[Expression] = {
+ joinType match {
+ case Inner if condition.isDefined =>
+ left.constraints
+ .union(right.constraints)
+ .union(splitConjunctivePredicates(condition.get).toSet)
+ case LeftSemi if condition.isDefined =>
+ left.constraints
+ .union(splitConjunctivePredicates(condition.get).toSet)
+ case Inner =>
+ left.constraints.union(right.constraints)
+ case LeftExistence(_) =>
+ left.constraints
+ case LeftOuter =>
+ left.constraints
+ case RightOuter =>
+ right.constraints
+ case FullOuter =>
+ Set.empty[Expression]
+ }
+ }
+
+ def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
+
+ // Joins are only resolved if they don't introduce ambiguous expression ids.
+ // NaturalJoin should be ready for resolution only if everything else is resolved here
+ lazy val resolvedExceptNatural: Boolean = {
+ childrenResolved &&
+ expressions.forall(_.resolved) &&
+ duplicateResolved &&
+ condition.forall(_.dataType == BooleanType)
+ }
+
+ // if not a natural join, use `resolvedExceptNatural`. if it is a natural join or
+ // using join, we still need to eliminate natural or using before we mark it resolved.
+ override lazy val resolved: Boolean = joinType match {
+ case NaturalJoin(_) => false
+ case UsingJoin(_, _) => false
+ case _ => resolvedExceptNatural
+ }
+}
+
+/**
+ * A hint for the optimizer that we should broadcast the `child` if used in a join operator.
+ */
+case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+
+ // We manually set statistics of BroadcastHint to smallest value to make sure
+ // the plan wrapped by BroadcastHint will be considered to broadcast later.
+ override def statistics: Statistics = Statistics(sizeInBytes = 1)
+}
+
+case class InsertIntoTable(
+ table: LogicalPlan,
+ partition: Map[String, Option[String]],
+ child: LogicalPlan,
+ overwrite: Boolean,
+ ifNotExists: Boolean)
+ extends LogicalPlan {
+
+ override def children: Seq[LogicalPlan] = child :: Nil
+ override def output: Seq[Attribute] = Seq.empty
+
+ assert(overwrite || !ifNotExists)
+ override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall {
+ case (childAttr, tableAttr) =>
+ DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType)
+ }
+}
+
+/**
+ * A container for holding named common table expressions (CTEs) and a query plan.
+ * This operator will be removed during analysis and the relations will be substituted into child.
+ *
+ * @param child The final query of this CTE.
+ * @param cteRelations Queries that this CTE defined,
+ * key is the alias of the CTE definition,
+ * value is the CTE definition.
+ */
+case class With(child: LogicalPlan, cteRelations: Map[String, SubqueryAlias]) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+}
+
+case class WithWindowDefinition(
+ windowDefinitions: Map[String, WindowSpecDefinition],
+ child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+}
+
+/**
+ * @param order The ordering expressions
+ * @param global True means global sorting apply for entire data set,
+ * False means sorting only apply within the partition.
+ * @param child Child logical plan
+ */
+case class Sort(
+ order: Seq[SortOrder],
+ global: Boolean,
+ child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+ override def maxRows: Option[Long] = child.maxRows
+}
+
+/** Factory for constructing new `Range` nodes. */
+object Range {
+ def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = {
+ val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes
+ new Range(start, end, step, numSlices, output)
+ }
+}
+
+case class Range(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int,
+ output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
+ require(step != 0, "step cannot be 0")
+ val numElements: BigInt = {
+ val safeStart = BigInt(start)
+ val safeEnd = BigInt(end)
+ if ((safeEnd - safeStart) % step == 0 || (safeEnd > safeStart) != (step > 0)) {
+ (safeEnd - safeStart) / step
+ } else {
+ // the remainder has the same sign with range, could add 1 more
+ (safeEnd - safeStart) / step + 1
+ }
+ }
+
+ override def newInstance(): Range =
+ Range(start, end, step, numSlices, output.map(_.newInstance()))
+
+ override def statistics: Statistics = {
+ val sizeInBytes = LongType.defaultSize * numElements
+ Statistics( sizeInBytes = sizeInBytes )
+ }
+}
+
+case class Aggregate(
+ groupingExpressions: Seq[Expression],
+ aggregateExpressions: Seq[NamedExpression],
+ child: LogicalPlan)
+ extends UnaryNode {
+
+ override lazy val resolved: Boolean = {
+ val hasWindowExpressions = aggregateExpressions.exists ( _.collect {
+ case window: WindowExpression => window
+ }.nonEmpty
+ )
+
+ !expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions
+ }
+
+ override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
+ override def maxRows: Option[Long] = child.maxRows
+
+ override def validConstraints: Set[Expression] =
+ child.constraints.union(getAliasedConstraints(aggregateExpressions))
+
+ override def statistics: Statistics = {
+ if (groupingExpressions.isEmpty) {
+ Statistics(sizeInBytes = 1)
+ } else {
+ super.statistics
+ }
+ }
+}
+
+case class Window(
+ windowExpressions: Seq[NamedExpression],
+ partitionSpec: Seq[Expression],
+ orderSpec: Seq[SortOrder],
+ child: LogicalPlan) extends UnaryNode {
+
+ override def output: Seq[Attribute] =
+ child.output ++ windowExpressions.map(_.toAttribute)
+
+ def windowOutputSet: AttributeSet = AttributeSet(windowExpressions.map(_.toAttribute))
+}
+
+private[sql] object Expand {
+ /**
+ * Extract attribute set according to the grouping id.
+ *
+ * @param bitmask bitmask to represent the selected of the attribute sequence
+ * @param attrs the attributes in sequence
+ * @return the attributes of non selected specified via bitmask (with the bit set to 1)
+ */
+ private def buildNonSelectAttrSet(
+ bitmask: Int,
+ attrs: Seq[Attribute]): AttributeSet = {
+ val nonSelect = new ArrayBuffer[Attribute]()
+
+ var bit = attrs.length - 1
+ while (bit >= 0) {
+ if (((bitmask >> bit) & 1) == 1) nonSelect += attrs(attrs.length - bit - 1)
+ bit -= 1
+ }
+
+ AttributeSet(nonSelect)
+ }
+
+ /**
+ * Apply the all of the GroupExpressions to every input row, hence we will get
+ * multiple output rows for a input row.
+ *
+ * @param bitmasks The bitmask set represents the grouping sets
+ * @param groupByAliases The aliased original group by expressions
+ * @param groupByAttrs The attributes of aliased group by expressions
+ * @param gid Attribute of the grouping id
+ * @param child Child operator
+ */
+ def apply(
+ bitmasks: Seq[Int],
+ groupByAliases: Seq[Alias],
+ groupByAttrs: Seq[Attribute],
+ gid: Attribute,
+ child: LogicalPlan): Expand = {
+ // Create an array of Projections for the child projection, and replace the projections'
+ // expressions which equal GroupBy expressions with Literal(null), if those expressions
+ // are not set for this grouping set (according to the bit mask).
+ val projections = bitmasks.map { bitmask =>
+ // get the non selected grouping attributes according to the bit mask
+ val nonSelectedGroupAttrSet = buildNonSelectAttrSet(bitmask, groupByAttrs)
+
+ child.output ++ groupByAttrs.map { attr =>
+ if (nonSelectedGroupAttrSet.contains(attr)) {
+ // if the input attribute in the Invalid Grouping Expression set of for this group
+ // replace it with constant null
+ Literal.create(null, attr.dataType)
+ } else {
+ attr
+ }
+ // groupingId is the last output, here we use the bit mask as the concrete value for it.
+ } :+ Literal.create(bitmask, IntegerType)
+ }
+
+ // the `groupByAttrs` has different meaning in `Expand.output`, it could be the original
+ // grouping expression or null, so here we create new instance of it.
+ val output = child.output ++ groupByAttrs.map(_.newInstance) :+ gid
+ Expand(projections, output, Project(child.output ++ groupByAliases, child))
+ }
+}
+
+/**
+ * Apply a number of projections to every input row, hence we will get multiple output rows for
+ * a input row.
+ *
+ * @param projections to apply
+ * @param output of all projections.
+ * @param child operator.
+ */
+case class Expand(
+ projections: Seq[Seq[Expression]],
+ output: Seq[Attribute],
+ child: LogicalPlan) extends UnaryNode {
+ override def references: AttributeSet =
+ AttributeSet(projections.flatten.flatMap(_.references))
+
+ override def statistics: Statistics = {
+ val sizeInBytes = super.statistics.sizeInBytes * projections.length
+ Statistics(sizeInBytes = sizeInBytes)
+ }
+
+ // This operator can reuse attributes (for example making them null when doing a roll up) so
+ // the contraints of the child may no longer be valid.
+ override protected def validConstraints: Set[Expression] = Set.empty[Expression]
+}
+
+/**
+ * A GROUP BY clause with GROUPING SETS can generate a result set equivalent
+ * to generated by a UNION ALL of multiple simple GROUP BY clauses.
+ *
+ * We will transform GROUPING SETS into logical plan Aggregate(.., Expand) in Analyzer
+ *
+ * @param bitmasks A list of bitmasks, each of the bitmask indicates the selected
+ * GroupBy expressions
+ * @param groupByExprs The Group By expressions candidates, take effective only if the
+ * associated bit in the bitmask set to 1.
+ * @param child Child operator
+ * @param aggregations The Aggregation expressions, those non selected group by expressions
+ * will be considered as constant null if it appears in the expressions
+ */
+case class GroupingSets(
+ bitmasks: Seq[Int],
+ groupByExprs: Seq[Expression],
+ child: LogicalPlan,
+ aggregations: Seq[NamedExpression]) extends UnaryNode {
+
+ override def output: Seq[Attribute] = aggregations.map(_.toAttribute)
+
+ // Needs to be unresolved before its translated to Aggregate + Expand because output attributes
+ // will change in analysis.
+ override lazy val resolved: Boolean = false
+}
+
+case class Pivot(
+ groupByExprs: Seq[NamedExpression],
+ pivotColumn: Expression,
+ pivotValues: Seq[Literal],
+ aggregates: Seq[Expression],
+ child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = groupByExprs.map(_.toAttribute) ++ aggregates match {
+ case agg :: Nil => pivotValues.map(value => AttributeReference(value.toString, agg.dataType)())
+ case _ => pivotValues.flatMap{ value =>
+ aggregates.map(agg => AttributeReference(value + "_" + agg.sql, agg.dataType)())
+ }
+ }
+}
+
+object Limit {
+ def apply(limitExpr: Expression, child: LogicalPlan): UnaryNode = {
+ GlobalLimit(limitExpr, LocalLimit(limitExpr, child))
+ }
+
+ def unapply(p: GlobalLimit): Option[(Expression, LogicalPlan)] = {
+ p match {
+ case GlobalLimit(le1, LocalLimit(le2, child)) if le1 == le2 => Some((le1, child))
+ case _ => None
+ }
+ }
+}
+
+case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+ override def maxRows: Option[Long] = {
+ limitExpr match {
+ case IntegerLiteral(limit) => Some(limit)
+ case _ => None
+ }
+ }
+ override lazy val statistics: Statistics = {
+ val limit = limitExpr.eval().asInstanceOf[Int]
+ val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
+ Statistics(sizeInBytes = sizeInBytes)
+ }
+}
+
+case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+ override def maxRows: Option[Long] = {
+ limitExpr match {
+ case IntegerLiteral(limit) => Some(limit)
+ case _ => None
+ }
+ }
+ override lazy val statistics: Statistics = {
+ val limit = limitExpr.eval().asInstanceOf[Int]
+ val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
+ Statistics(sizeInBytes = sizeInBytes)
+ }
+}
+
+case class SubqueryAlias(alias: String, child: LogicalPlan) extends UnaryNode {
+
+ override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))
+}
+
+/**
+ * Sample the dataset.
+ *
+ * @param lowerBound Lower-bound of the sampling probability (usually 0.0)
+ * @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
+ * will be ub - lb.
+ * @param withReplacement Whether to sample with replacement.
+ * @param seed the random seed
+ * @param child the LogicalPlan
+ * @param isTableSample Is created from TABLESAMPLE in the parser.
+ */
+case class Sample(
+ lowerBound: Double,
+ upperBound: Double,
+ withReplacement: Boolean,
+ seed: Long,
+ child: LogicalPlan)(
+ val isTableSample: java.lang.Boolean = false) extends UnaryNode {
+
+ override def output: Seq[Attribute] = child.output
+
+ override def statistics: Statistics = {
+ val ratio = upperBound - lowerBound
+ // BigInt can't multiply with Double
+ var sizeInBytes = child.statistics.sizeInBytes * (ratio * 100).toInt / 100
+ if (sizeInBytes == 0) {
+ sizeInBytes = 1
+ }
+ Statistics(sizeInBytes = sizeInBytes)
+ }
+
+ override protected def otherCopyArgs: Seq[AnyRef] = isTableSample :: Nil
+}
+
+/**
+ * Returns a new logical plan that dedups input rows.
+ */
+case class Distinct(child: LogicalPlan) extends UnaryNode {
+ override def maxRows: Option[Long] = child.maxRows
+ override def output: Seq[Attribute] = child.output
+}
+
+/**
+ * Returns a new RDD that has exactly `numPartitions` partitions. Differs from
+ * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
+ * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
+ * of the output requires some specific ordering or distribution of the data.
+ */
+case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
+ extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+}
+
+/**
+ * A relation with one row. This is used in "SELECT ..." without a from clause.
+ */
+case object OneRowRelation extends LeafNode {
+ override def maxRows: Option[Long] = Some(1)
+ override def output: Seq[Attribute] = Nil
+
+ /**
+ * Computes [[Statistics]] for this plan. The default implementation assumes the output
+ * cardinality is the product of of all child plan's cardinality, i.e. applies in the case
+ * of cartesian joins.
+ *
+ * [[LeafNode]]s must override this.
+ */
+ override def statistics: Statistics = Statistics(sizeInBytes = 1)
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/5c8a0ec9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
deleted file mode 100644
index a445ce6..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ /dev/null
@@ -1,709 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.plans.logical
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.types._
-
-/**
- * When planning take() or collect() operations, this special node that is inserted at the top of
- * the logical plan before invoking the query planner.
- *
- * Rules can pattern-match on this node in order to apply transformations that only take effect
- * at the top of the logical query plan.
- */
-case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-}
-
-case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = projectList.map(_.toAttribute)
- override def maxRows: Option[Long] = child.maxRows
-
- override lazy val resolved: Boolean = {
- val hasSpecialExpressions = projectList.exists ( _.collect {
- case agg: AggregateExpression => agg
- case generator: Generator => generator
- case window: WindowExpression => window
- }.nonEmpty
- )
-
- !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
- }
-
- override def validConstraints: Set[Expression] =
- child.constraints.union(getAliasedConstraints(projectList))
-}
-
-/**
- * Applies a [[Generator]] to a stream of input rows, combining the
- * output of each into a new stream of rows. This operation is similar to a `flatMap` in functional
- * programming with one important additional feature, which allows the input rows to be joined with
- * their output.
- *
- * @param generator the generator expression
- * @param join when true, each output row is implicitly joined with the input tuple that produced
- * it.
- * @param outer when true, each input row will be output at least once, even if the output of the
- * given `generator` is empty. `outer` has no effect when `join` is false.
- * @param qualifier Qualifier for the attributes of generator(UDTF)
- * @param generatorOutput The output schema of the Generator.
- * @param child Children logical plan node
- */
-case class Generate(
- generator: Generator,
- join: Boolean,
- outer: Boolean,
- qualifier: Option[String],
- generatorOutput: Seq[Attribute],
- child: LogicalPlan)
- extends UnaryNode {
-
- /** The set of all attributes produced by this node. */
- def generatedSet: AttributeSet = AttributeSet(generatorOutput)
-
- override lazy val resolved: Boolean = {
- generator.resolved &&
- childrenResolved &&
- generator.elementTypes.length == generatorOutput.length &&
- generatorOutput.forall(_.resolved)
- }
-
- override def producedAttributes: AttributeSet = AttributeSet(generatorOutput)
-
- def output: Seq[Attribute] = {
- val qualified = qualifier.map(q =>
- // prepend the new qualifier to the existed one
- generatorOutput.map(a => a.withQualifier(Some(q)))
- ).getOrElse(generatorOutput)
-
- if (join) child.output ++ qualified else qualified
- }
-}
-
-case class Filter(condition: Expression, child: LogicalPlan)
- extends UnaryNode with PredicateHelper {
- override def output: Seq[Attribute] = child.output
-
- override def maxRows: Option[Long] = child.maxRows
-
- override protected def validConstraints: Set[Expression] =
- child.constraints.union(splitConjunctivePredicates(condition).toSet)
-}
-
-abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
-
- protected def leftConstraints: Set[Expression] = left.constraints
-
- protected def rightConstraints: Set[Expression] = {
- require(left.output.size == right.output.size)
- val attributeRewrites = AttributeMap(right.output.zip(left.output))
- right.constraints.map(_ transform {
- case a: Attribute => attributeRewrites(a)
- })
- }
-}
-
-private[sql] object SetOperation {
- def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right))
-}
-
-case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
-
- def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
-
- override def output: Seq[Attribute] =
- left.output.zip(right.output).map { case (leftAttr, rightAttr) =>
- leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
- }
-
- override protected def validConstraints: Set[Expression] =
- leftConstraints.union(rightConstraints)
-
- // Intersect are only resolved if they don't introduce ambiguous expression ids,
- // since the Optimizer will convert Intersect to Join.
- override lazy val resolved: Boolean =
- childrenResolved &&
- left.output.length == right.output.length &&
- left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
- duplicateResolved
-
- override def maxRows: Option[Long] = {
- if (children.exists(_.maxRows.isEmpty)) {
- None
- } else {
- Some(children.flatMap(_.maxRows).min)
- }
- }
-
- override def statistics: Statistics = {
- val leftSize = left.statistics.sizeInBytes
- val rightSize = right.statistics.sizeInBytes
- val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize
- Statistics(sizeInBytes = sizeInBytes)
- }
-}
-
-case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
- /** We don't use right.output because those rows get excluded from the set. */
- override def output: Seq[Attribute] = left.output
-
- override protected def validConstraints: Set[Expression] = leftConstraints
-
- override lazy val resolved: Boolean =
- childrenResolved &&
- left.output.length == right.output.length &&
- left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType }
-
- override def statistics: Statistics = {
- Statistics(sizeInBytes = left.statistics.sizeInBytes)
- }
-}
-
-/** Factory for constructing new `Union` nodes. */
-object Union {
- def apply(left: LogicalPlan, right: LogicalPlan): Union = {
- Union (left :: right :: Nil)
- }
-}
-
-case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
- override def maxRows: Option[Long] = {
- if (children.exists(_.maxRows.isEmpty)) {
- None
- } else {
- Some(children.flatMap(_.maxRows).sum)
- }
- }
-
- // updating nullability to make all the children consistent
- override def output: Seq[Attribute] =
- children.map(_.output).transpose.map(attrs =>
- attrs.head.withNullability(attrs.exists(_.nullable)))
-
- override lazy val resolved: Boolean = {
- // allChildrenCompatible needs to be evaluated after childrenResolved
- def allChildrenCompatible: Boolean =
- children.tail.forall( child =>
- // compare the attribute number with the first child
- child.output.length == children.head.output.length &&
- // compare the data types with the first child
- child.output.zip(children.head.output).forall {
- case (l, r) => l.dataType == r.dataType }
- )
-
- children.length > 1 && childrenResolved && allChildrenCompatible
- }
-
- override def statistics: Statistics = {
- val sizeInBytes = children.map(_.statistics.sizeInBytes).sum
- Statistics(sizeInBytes = sizeInBytes)
- }
-
- /**
- * Maps the constraints containing a given (original) sequence of attributes to those with a
- * given (reference) sequence of attributes. Given the nature of union, we expect that the
- * mapping between the original and reference sequences are symmetric.
- */
- private def rewriteConstraints(
- reference: Seq[Attribute],
- original: Seq[Attribute],
- constraints: Set[Expression]): Set[Expression] = {
- require(reference.size == original.size)
- val attributeRewrites = AttributeMap(original.zip(reference))
- constraints.map(_ transform {
- case a: Attribute => attributeRewrites(a)
- })
- }
-
- private def merge(a: Set[Expression], b: Set[Expression]): Set[Expression] = {
- val common = a.intersect(b)
- // The constraint with only one reference could be easily inferred as predicate
- // Grouping the constraints by it's references so we can combine the constraints with same
- // reference together
- val othera = a.diff(common).filter(_.references.size == 1).groupBy(_.references.head)
- val otherb = b.diff(common).filter(_.references.size == 1).groupBy(_.references.head)
- // loose the constraints by: A1 && B1 || A2 && B2 -> (A1 || A2) && (B1 || B2)
- val others = (othera.keySet intersect otherb.keySet).map { attr =>
- Or(othera(attr).reduceLeft(And), otherb(attr).reduceLeft(And))
- }
- common ++ others
- }
-
- override protected def validConstraints: Set[Expression] = {
- children
- .map(child => rewriteConstraints(children.head.output, child.output, child.constraints))
- .reduce(merge(_, _))
- }
-}
-
-case class Join(
- left: LogicalPlan,
- right: LogicalPlan,
- joinType: JoinType,
- condition: Option[Expression])
- extends BinaryNode with PredicateHelper {
-
- override def output: Seq[Attribute] = {
- joinType match {
- case LeftExistence(_) =>
- left.output
- case LeftOuter =>
- left.output ++ right.output.map(_.withNullability(true))
- case RightOuter =>
- left.output.map(_.withNullability(true)) ++ right.output
- case FullOuter =>
- left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
- case _ =>
- left.output ++ right.output
- }
- }
-
- override protected def validConstraints: Set[Expression] = {
- joinType match {
- case Inner if condition.isDefined =>
- left.constraints
- .union(right.constraints)
- .union(splitConjunctivePredicates(condition.get).toSet)
- case LeftSemi if condition.isDefined =>
- left.constraints
- .union(splitConjunctivePredicates(condition.get).toSet)
- case Inner =>
- left.constraints.union(right.constraints)
- case LeftExistence(_) =>
- left.constraints
- case LeftOuter =>
- left.constraints
- case RightOuter =>
- right.constraints
- case FullOuter =>
- Set.empty[Expression]
- }
- }
-
- def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
-
- // Joins are only resolved if they don't introduce ambiguous expression ids.
- // NaturalJoin should be ready for resolution only if everything else is resolved here
- lazy val resolvedExceptNatural: Boolean = {
- childrenResolved &&
- expressions.forall(_.resolved) &&
- duplicateResolved &&
- condition.forall(_.dataType == BooleanType)
- }
-
- // if not a natural join, use `resolvedExceptNatural`. if it is a natural join or
- // using join, we still need to eliminate natural or using before we mark it resolved.
- override lazy val resolved: Boolean = joinType match {
- case NaturalJoin(_) => false
- case UsingJoin(_, _) => false
- case _ => resolvedExceptNatural
- }
-}
-
-/**
- * A hint for the optimizer that we should broadcast the `child` if used in a join operator.
- */
-case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-
- // We manually set statistics of BroadcastHint to smallest value to make sure
- // the plan wrapped by BroadcastHint will be considered to broadcast later.
- override def statistics: Statistics = Statistics(sizeInBytes = 1)
-}
-
-case class InsertIntoTable(
- table: LogicalPlan,
- partition: Map[String, Option[String]],
- child: LogicalPlan,
- overwrite: Boolean,
- ifNotExists: Boolean)
- extends LogicalPlan {
-
- override def children: Seq[LogicalPlan] = child :: Nil
- override def output: Seq[Attribute] = Seq.empty
-
- assert(overwrite || !ifNotExists)
- override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall {
- case (childAttr, tableAttr) =>
- DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType)
- }
-}
-
-/**
- * A container for holding named common table expressions (CTEs) and a query plan.
- * This operator will be removed during analysis and the relations will be substituted into child.
- *
- * @param child The final query of this CTE.
- * @param cteRelations Queries that this CTE defined,
- * key is the alias of the CTE definition,
- * value is the CTE definition.
- */
-case class With(child: LogicalPlan, cteRelations: Map[String, SubqueryAlias]) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-}
-
-case class WithWindowDefinition(
- windowDefinitions: Map[String, WindowSpecDefinition],
- child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-}
-
-/**
- * @param order The ordering expressions
- * @param global True means global sorting apply for entire data set,
- * False means sorting only apply within the partition.
- * @param child Child logical plan
- */
-case class Sort(
- order: Seq[SortOrder],
- global: Boolean,
- child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
- override def maxRows: Option[Long] = child.maxRows
-}
-
-/** Factory for constructing new `Range` nodes. */
-object Range {
- def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = {
- val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes
- new Range(start, end, step, numSlices, output)
- }
-}
-
-case class Range(
- start: Long,
- end: Long,
- step: Long,
- numSlices: Int,
- output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
- require(step != 0, "step cannot be 0")
- val numElements: BigInt = {
- val safeStart = BigInt(start)
- val safeEnd = BigInt(end)
- if ((safeEnd - safeStart) % step == 0 || (safeEnd > safeStart) != (step > 0)) {
- (safeEnd - safeStart) / step
- } else {
- // the remainder has the same sign with range, could add 1 more
- (safeEnd - safeStart) / step + 1
- }
- }
-
- override def newInstance(): Range =
- Range(start, end, step, numSlices, output.map(_.newInstance()))
-
- override def statistics: Statistics = {
- val sizeInBytes = LongType.defaultSize * numElements
- Statistics( sizeInBytes = sizeInBytes )
- }
-}
-
-case class Aggregate(
- groupingExpressions: Seq[Expression],
- aggregateExpressions: Seq[NamedExpression],
- child: LogicalPlan)
- extends UnaryNode {
-
- override lazy val resolved: Boolean = {
- val hasWindowExpressions = aggregateExpressions.exists ( _.collect {
- case window: WindowExpression => window
- }.nonEmpty
- )
-
- !expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions
- }
-
- override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
- override def maxRows: Option[Long] = child.maxRows
-
- override def validConstraints: Set[Expression] =
- child.constraints.union(getAliasedConstraints(aggregateExpressions))
-
- override def statistics: Statistics = {
- if (groupingExpressions.isEmpty) {
- Statistics(sizeInBytes = 1)
- } else {
- super.statistics
- }
- }
-}
-
-case class Window(
- windowExpressions: Seq[NamedExpression],
- partitionSpec: Seq[Expression],
- orderSpec: Seq[SortOrder],
- child: LogicalPlan) extends UnaryNode {
-
- override def output: Seq[Attribute] =
- child.output ++ windowExpressions.map(_.toAttribute)
-
- def windowOutputSet: AttributeSet = AttributeSet(windowExpressions.map(_.toAttribute))
-}
-
-private[sql] object Expand {
- /**
- * Extract attribute set according to the grouping id.
- *
- * @param bitmask bitmask to represent the selected of the attribute sequence
- * @param attrs the attributes in sequence
- * @return the attributes of non selected specified via bitmask (with the bit set to 1)
- */
- private def buildNonSelectAttrSet(
- bitmask: Int,
- attrs: Seq[Attribute]): AttributeSet = {
- val nonSelect = new ArrayBuffer[Attribute]()
-
- var bit = attrs.length - 1
- while (bit >= 0) {
- if (((bitmask >> bit) & 1) == 1) nonSelect += attrs(attrs.length - bit - 1)
- bit -= 1
- }
-
- AttributeSet(nonSelect)
- }
-
- /**
- * Apply the all of the GroupExpressions to every input row, hence we will get
- * multiple output rows for a input row.
- *
- * @param bitmasks The bitmask set represents the grouping sets
- * @param groupByAliases The aliased original group by expressions
- * @param groupByAttrs The attributes of aliased group by expressions
- * @param gid Attribute of the grouping id
- * @param child Child operator
- */
- def apply(
- bitmasks: Seq[Int],
- groupByAliases: Seq[Alias],
- groupByAttrs: Seq[Attribute],
- gid: Attribute,
- child: LogicalPlan): Expand = {
- // Create an array of Projections for the child projection, and replace the projections'
- // expressions which equal GroupBy expressions with Literal(null), if those expressions
- // are not set for this grouping set (according to the bit mask).
- val projections = bitmasks.map { bitmask =>
- // get the non selected grouping attributes according to the bit mask
- val nonSelectedGroupAttrSet = buildNonSelectAttrSet(bitmask, groupByAttrs)
-
- child.output ++ groupByAttrs.map { attr =>
- if (nonSelectedGroupAttrSet.contains(attr)) {
- // if the input attribute in the Invalid Grouping Expression set of for this group
- // replace it with constant null
- Literal.create(null, attr.dataType)
- } else {
- attr
- }
- // groupingId is the last output, here we use the bit mask as the concrete value for it.
- } :+ Literal.create(bitmask, IntegerType)
- }
-
- // the `groupByAttrs` has different meaning in `Expand.output`, it could be the original
- // grouping expression or null, so here we create new instance of it.
- val output = child.output ++ groupByAttrs.map(_.newInstance) :+ gid
- Expand(projections, output, Project(child.output ++ groupByAliases, child))
- }
-}
-
-/**
- * Apply a number of projections to every input row, hence we will get multiple output rows for
- * a input row.
- *
- * @param projections to apply
- * @param output of all projections.
- * @param child operator.
- */
-case class Expand(
- projections: Seq[Seq[Expression]],
- output: Seq[Attribute],
- child: LogicalPlan) extends UnaryNode {
- override def references: AttributeSet =
- AttributeSet(projections.flatten.flatMap(_.references))
-
- override def statistics: Statistics = {
- val sizeInBytes = super.statistics.sizeInBytes * projections.length
- Statistics(sizeInBytes = sizeInBytes)
- }
-
- // This operator can reuse attributes (for example making them null when doing a roll up) so
- // the contraints of the child may no longer be valid.
- override protected def validConstraints: Set[Expression] = Set.empty[Expression]
-}
-
-/**
- * A GROUP BY clause with GROUPING SETS can generate a result set equivalent
- * to generated by a UNION ALL of multiple simple GROUP BY clauses.
- *
- * We will transform GROUPING SETS into logical plan Aggregate(.., Expand) in Analyzer
- *
- * @param bitmasks A list of bitmasks, each of the bitmask indicates the selected
- * GroupBy expressions
- * @param groupByExprs The Group By expressions candidates, take effective only if the
- * associated bit in the bitmask set to 1.
- * @param child Child operator
- * @param aggregations The Aggregation expressions, those non selected group by expressions
- * will be considered as constant null if it appears in the expressions
- */
-case class GroupingSets(
- bitmasks: Seq[Int],
- groupByExprs: Seq[Expression],
- child: LogicalPlan,
- aggregations: Seq[NamedExpression]) extends UnaryNode {
-
- override def output: Seq[Attribute] = aggregations.map(_.toAttribute)
-
- // Needs to be unresolved before its translated to Aggregate + Expand because output attributes
- // will change in analysis.
- override lazy val resolved: Boolean = false
-}
-
-case class Pivot(
- groupByExprs: Seq[NamedExpression],
- pivotColumn: Expression,
- pivotValues: Seq[Literal],
- aggregates: Seq[Expression],
- child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = groupByExprs.map(_.toAttribute) ++ aggregates match {
- case agg :: Nil => pivotValues.map(value => AttributeReference(value.toString, agg.dataType)())
- case _ => pivotValues.flatMap{ value =>
- aggregates.map(agg => AttributeReference(value + "_" + agg.sql, agg.dataType)())
- }
- }
-}
-
-object Limit {
- def apply(limitExpr: Expression, child: LogicalPlan): UnaryNode = {
- GlobalLimit(limitExpr, LocalLimit(limitExpr, child))
- }
-
- def unapply(p: GlobalLimit): Option[(Expression, LogicalPlan)] = {
- p match {
- case GlobalLimit(le1, LocalLimit(le2, child)) if le1 == le2 => Some((le1, child))
- case _ => None
- }
- }
-}
-
-case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
- override def maxRows: Option[Long] = {
- limitExpr match {
- case IntegerLiteral(limit) => Some(limit)
- case _ => None
- }
- }
- override lazy val statistics: Statistics = {
- val limit = limitExpr.eval().asInstanceOf[Int]
- val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
- Statistics(sizeInBytes = sizeInBytes)
- }
-}
-
-case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
- override def maxRows: Option[Long] = {
- limitExpr match {
- case IntegerLiteral(limit) => Some(limit)
- case _ => None
- }
- }
- override lazy val statistics: Statistics = {
- val limit = limitExpr.eval().asInstanceOf[Int]
- val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
- Statistics(sizeInBytes = sizeInBytes)
- }
-}
-
-case class SubqueryAlias(alias: String, child: LogicalPlan) extends UnaryNode {
-
- override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))
-}
-
-/**
- * Sample the dataset.
- *
- * @param lowerBound Lower-bound of the sampling probability (usually 0.0)
- * @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
- * will be ub - lb.
- * @param withReplacement Whether to sample with replacement.
- * @param seed the random seed
- * @param child the LogicalPlan
- * @param isTableSample Is created from TABLESAMPLE in the parser.
- */
-case class Sample(
- lowerBound: Double,
- upperBound: Double,
- withReplacement: Boolean,
- seed: Long,
- child: LogicalPlan)(
- val isTableSample: java.lang.Boolean = false) extends UnaryNode {
-
- override def output: Seq[Attribute] = child.output
-
- override def statistics: Statistics = {
- val ratio = upperBound - lowerBound
- // BigInt can't multiply with Double
- var sizeInBytes = child.statistics.sizeInBytes * (ratio * 100).toInt / 100
- if (sizeInBytes == 0) {
- sizeInBytes = 1
- }
- Statistics(sizeInBytes = sizeInBytes)
- }
-
- override protected def otherCopyArgs: Seq[AnyRef] = isTableSample :: Nil
-}
-
-/**
- * Returns a new logical plan that dedups input rows.
- */
-case class Distinct(child: LogicalPlan) extends UnaryNode {
- override def maxRows: Option[Long] = child.maxRows
- override def output: Seq[Attribute] = child.output
-}
-
-/**
- * Returns a new RDD that has exactly `numPartitions` partitions. Differs from
- * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
- * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
- * of the output requires some specific ordering or distribution of the data.
- */
-case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
- extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-}
-
-/**
- * A relation with one row. This is used in "SELECT ..." without a from clause.
- */
-case object OneRowRelation extends LeafNode {
- override def maxRows: Option[Long] = Some(1)
- override def output: Seq[Attribute] = Nil
-
- /**
- * Computes [[Statistics]] for this plan. The default implementation assumes the output
- * cardinality is the product of of all child plan's cardinality, i.e. applies in the case
- * of cartesian joins.
- *
- * [[LeafNode]]s must override this.
- */
- override def statistics: Statistics = Statistics(sizeInBytes = 1)
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/5c8a0ec9/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
deleted file mode 100644
index 83f527f..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ /dev/null
@@ -1,530 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.LongType
-import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
-
-/** Physical plan for Project. */
-case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
- extends UnaryExecNode with CodegenSupport {
-
- override def output: Seq[Attribute] = projectList.map(_.toAttribute)
-
- override def inputRDDs(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].inputRDDs()
- }
-
- protected override def doProduce(ctx: CodegenContext): String = {
- child.asInstanceOf[CodegenSupport].produce(ctx, this)
- }
-
- override def usedInputs: AttributeSet = {
- // only the attributes those are used at least twice should be evaluated before this plan,
- // otherwise we could defer the evaluation until output attribute is actually used.
- val usedExprIds = projectList.flatMap(_.collect {
- case a: Attribute => a.exprId
- })
- val usedMoreThanOnce = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet
- references.filter(a => usedMoreThanOnce.contains(a.exprId))
- }
-
- override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
- val exprs = projectList.map(x =>
- ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output)))
- ctx.currentVars = input
- val resultVars = exprs.map(_.genCode(ctx))
- // Evaluation of non-deterministic expressions can't be deferred.
- val nonDeterministicAttrs = projectList.filterNot(_.deterministic).map(_.toAttribute)
- s"""
- |${evaluateRequiredVariables(output, resultVars, AttributeSet(nonDeterministicAttrs))}
- |${consume(ctx, resultVars)}
- """.stripMargin
- }
-
- protected override def doExecute(): RDD[InternalRow] = {
- child.execute().mapPartitionsInternal { iter =>
- val project = UnsafeProjection.create(projectList, child.output,
- subexpressionEliminationEnabled)
- iter.map(project)
- }
- }
-
- override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-}
-
-
-/** Physical plan for Filter. */
-case class FilterExec(condition: Expression, child: SparkPlan)
- extends UnaryExecNode with CodegenSupport with PredicateHelper {
-
- // Split out all the IsNotNulls from condition.
- private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition {
- case IsNotNull(a: NullIntolerant) if a.references.subsetOf(child.outputSet) => true
- case _ => false
- }
-
- // The columns that will filtered out by `IsNotNull` could be considered as not nullable.
- private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId)
-
- // Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate
- // all the variables at the beginning to take advantage of short circuiting.
- override def usedInputs: AttributeSet = AttributeSet.empty
-
- override def output: Seq[Attribute] = {
- child.output.map { a =>
- if (a.nullable && notNullAttributes.contains(a.exprId)) {
- a.withNullability(false)
- } else {
- a
- }
- }
- }
-
- private[sql] override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
-
- override def inputRDDs(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].inputRDDs()
- }
-
- protected override def doProduce(ctx: CodegenContext): String = {
- child.asInstanceOf[CodegenSupport].produce(ctx, this)
- }
-
- override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
- val numOutput = metricTerm(ctx, "numOutputRows")
-
- /**
- * Generates code for `c`, using `in` for input attributes and `attrs` for nullability.
- */
- def genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String = {
- val bound = BindReferences.bindReference(c, attrs)
- val evaluated = evaluateRequiredVariables(child.output, in, c.references)
-
- // Generate the code for the predicate.
- val ev = ExpressionCanonicalizer.execute(bound).genCode(ctx)
- val nullCheck = if (bound.nullable) {
- s"${ev.isNull} || "
- } else {
- s""
- }
-
- s"""
- |$evaluated
- |${ev.code}
- |if (${nullCheck}!${ev.value}) continue;
- """.stripMargin
- }
-
- ctx.currentVars = input
-
- // To generate the predicates we will follow this algorithm.
- // For each predicate that is not IsNotNull, we will generate them one by one loading attributes
- // as necessary. For each of both attributes, if there is a IsNotNull predicate we will generate
- // that check *before* the predicate. After all of these predicates, we will generate the
- // remaining IsNotNull checks that were not part of other predicates.
- // This has the property of not doing redundant IsNotNull checks and taking better advantage of
- // short-circuiting, not loading attributes until they are needed.
- // This is very perf sensitive.
- // TODO: revisit this. We can consider reordering predicates as well.
- val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length)
- val generated = otherPreds.map { c =>
- val nullChecks = c.references.map { r =>
- val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)}
- if (idx != -1 && !generatedIsNotNullChecks(idx)) {
- generatedIsNotNullChecks(idx) = true
- // Use the child's output. The nullability is what the child produced.
- genPredicate(notNullPreds(idx), input, child.output)
- } else {
- ""
- }
- }.mkString("\n").trim
-
- // Here we use *this* operator's output with this output's nullability since we already
- // enforced them with the IsNotNull checks above.
- s"""
- |$nullChecks
- |${genPredicate(c, input, output)}
- """.stripMargin.trim
- }.mkString("\n")
-
- val nullChecks = notNullPreds.zipWithIndex.map { case (c, idx) =>
- if (!generatedIsNotNullChecks(idx)) {
- genPredicate(c, input, child.output)
- } else {
- ""
- }
- }.mkString("\n")
-
- // Reset the isNull to false for the not-null columns, then the followed operators could
- // generate better code (remove dead branches).
- val resultVars = input.zipWithIndex.map { case (ev, i) =>
- if (notNullAttributes.contains(child.output(i).exprId)) {
- ev.isNull = "false"
- }
- ev
- }
-
- s"""
- |$generated
- |$nullChecks
- |$numOutput.add(1);
- |${consume(ctx, resultVars)}
- """.stripMargin
- }
-
- protected override def doExecute(): RDD[InternalRow] = {
- val numOutputRows = longMetric("numOutputRows")
- child.execute().mapPartitionsInternal { iter =>
- val predicate = newPredicate(condition, child.output)
- iter.filter { row =>
- val r = predicate(row)
- if (r) numOutputRows += 1
- r
- }
- }
- }
-
- override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-}
-
-/**
- * Physical plan for sampling the dataset.
- *
- * @param lowerBound Lower-bound of the sampling probability (usually 0.0)
- * @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
- * will be ub - lb.
- * @param withReplacement Whether to sample with replacement.
- * @param seed the random seed
- * @param child the SparkPlan
- */
-case class SampleExec(
- lowerBound: Double,
- upperBound: Double,
- withReplacement: Boolean,
- seed: Long,
- child: SparkPlan) extends UnaryExecNode with CodegenSupport {
- override def output: Seq[Attribute] = child.output
-
- private[sql] override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
-
- protected override def doExecute(): RDD[InternalRow] = {
- if (withReplacement) {
- // Disable gap sampling since the gap sampling method buffers two rows internally,
- // requiring us to copy the row, which is more expensive than the random number generator.
- new PartitionwiseSampledRDD[InternalRow, InternalRow](
- child.execute(),
- new PoissonSampler[InternalRow](upperBound - lowerBound, useGapSamplingIfPossible = false),
- preservesPartitioning = true,
- seed)
- } else {
- child.execute().randomSampleWithRange(lowerBound, upperBound, seed)
- }
- }
-
- override def inputRDDs(): Seq[RDD[InternalRow]] = {
- child.asInstanceOf[CodegenSupport].inputRDDs()
- }
-
- protected override def doProduce(ctx: CodegenContext): String = {
- child.asInstanceOf[CodegenSupport].produce(ctx, this)
- }
-
- override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
- val numOutput = metricTerm(ctx, "numOutputRows")
- val sampler = ctx.freshName("sampler")
-
- if (withReplacement) {
- val samplerClass = classOf[PoissonSampler[UnsafeRow]].getName
- val initSampler = ctx.freshName("initSampler")
- ctx.addMutableState(s"$samplerClass<UnsafeRow>", sampler,
- s"$initSampler();")
-
- ctx.addNewFunction(initSampler,
- s"""
- | private void $initSampler() {
- | $sampler = new $samplerClass<UnsafeRow>($upperBound - $lowerBound, false);
- | java.util.Random random = new java.util.Random(${seed}L);
- | long randomSeed = random.nextLong();
- | int loopCount = 0;
- | while (loopCount < partitionIndex) {
- | randomSeed = random.nextLong();
- | loopCount += 1;
- | }
- | $sampler.setSeed(randomSeed);
- | }
- """.stripMargin.trim)
-
- val samplingCount = ctx.freshName("samplingCount")
- s"""
- | int $samplingCount = $sampler.sample();
- | while ($samplingCount-- > 0) {
- | $numOutput.add(1);
- | ${consume(ctx, input)}
- | }
- """.stripMargin.trim
- } else {
- val samplerClass = classOf[BernoulliCellSampler[UnsafeRow]].getName
- ctx.addMutableState(s"$samplerClass<UnsafeRow>", sampler,
- s"""
- | $sampler = new $samplerClass<UnsafeRow>($lowerBound, $upperBound, false);
- | $sampler.setSeed(${seed}L + partitionIndex);
- """.stripMargin.trim)
-
- s"""
- | if ($sampler.sample() == 0) continue;
- | $numOutput.add(1);
- | ${consume(ctx, input)}
- """.stripMargin.trim
- }
- }
-}
-
-
-/**
- * Physical plan for range (generating a range of 64 bit numbers.
- *
- * @param start first number in the range, inclusive.
- * @param step size of the step increment.
- * @param numSlices number of partitions.
- * @param numElements total number of elements to output.
- * @param output output attributes.
- */
-case class RangeExec(
- start: Long,
- step: Long,
- numSlices: Int,
- numElements: BigInt,
- output: Seq[Attribute])
- extends LeafExecNode with CodegenSupport {
-
- private[sql] override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
-
- // output attributes should not affect the results
- override lazy val cleanArgs: Seq[Any] = Seq(start, step, numSlices, numElements)
-
- override def inputRDDs(): Seq[RDD[InternalRow]] = {
- sqlContext.sparkContext.parallelize(0 until numSlices, numSlices)
- .map(i => InternalRow(i)) :: Nil
- }
-
- protected override def doProduce(ctx: CodegenContext): String = {
- val numOutput = metricTerm(ctx, "numOutputRows")
-
- val initTerm = ctx.freshName("initRange")
- ctx.addMutableState("boolean", initTerm, s"$initTerm = false;")
- val partitionEnd = ctx.freshName("partitionEnd")
- ctx.addMutableState("long", partitionEnd, s"$partitionEnd = 0L;")
- val number = ctx.freshName("number")
- ctx.addMutableState("long", number, s"$number = 0L;")
- val overflow = ctx.freshName("overflow")
- ctx.addMutableState("boolean", overflow, s"$overflow = false;")
-
- val value = ctx.freshName("value")
- val ev = ExprCode("", "false", value)
- val BigInt = classOf[java.math.BigInteger].getName
- val checkEnd = if (step > 0) {
- s"$number < $partitionEnd"
- } else {
- s"$number > $partitionEnd"
- }
-
- ctx.addNewFunction("initRange",
- s"""
- | private void initRange(int idx) {
- | $BigInt index = $BigInt.valueOf(idx);
- | $BigInt numSlice = $BigInt.valueOf(${numSlices}L);
- | $BigInt numElement = $BigInt.valueOf(${numElements.toLong}L);
- | $BigInt step = $BigInt.valueOf(${step}L);
- | $BigInt start = $BigInt.valueOf(${start}L);
- |
- | $BigInt st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
- | if (st.compareTo($BigInt.valueOf(Long.MAX_VALUE)) > 0) {
- | $number = Long.MAX_VALUE;
- | } else if (st.compareTo($BigInt.valueOf(Long.MIN_VALUE)) < 0) {
- | $number = Long.MIN_VALUE;
- | } else {
- | $number = st.longValue();
- | }
- |
- | $BigInt end = index.add($BigInt.ONE).multiply(numElement).divide(numSlice)
- | .multiply(step).add(start);
- | if (end.compareTo($BigInt.valueOf(Long.MAX_VALUE)) > 0) {
- | $partitionEnd = Long.MAX_VALUE;
- | } else if (end.compareTo($BigInt.valueOf(Long.MIN_VALUE)) < 0) {
- | $partitionEnd = Long.MIN_VALUE;
- | } else {
- | $partitionEnd = end.longValue();
- | }
- |
- | $numOutput.add(($partitionEnd - $number) / ${step}L);
- | }
- """.stripMargin)
-
- val input = ctx.freshName("input")
- // Right now, Range is only used when there is one upstream.
- ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
- s"""
- | // initialize Range
- | if (!$initTerm) {
- | $initTerm = true;
- | initRange(partitionIndex);
- | }
- |
- | while (!$overflow && $checkEnd) {
- | long $value = $number;
- | $number += ${step}L;
- | if ($number < $value ^ ${step}L < 0) {
- | $overflow = true;
- | }
- | ${consume(ctx, Seq(ev))}
- | if (shouldStop()) return;
- | }
- """.stripMargin
- }
-
- protected override def doExecute(): RDD[InternalRow] = {
- val numOutputRows = longMetric("numOutputRows")
- sqlContext
- .sparkContext
- .parallelize(0 until numSlices, numSlices)
- .mapPartitionsWithIndex { (i, _) =>
- val partitionStart = (i * numElements) / numSlices * step + start
- val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
- def getSafeMargin(bi: BigInt): Long =
- if (bi.isValidLong) {
- bi.toLong
- } else if (bi > 0) {
- Long.MaxValue
- } else {
- Long.MinValue
- }
- val safePartitionStart = getSafeMargin(partitionStart)
- val safePartitionEnd = getSafeMargin(partitionEnd)
- val rowSize = UnsafeRow.calculateBitSetWidthInBytes(1) + LongType.defaultSize
- val unsafeRow = UnsafeRow.createFromByteArray(rowSize, 1)
-
- new Iterator[InternalRow] {
- private[this] var number: Long = safePartitionStart
- private[this] var overflow: Boolean = false
-
- override def hasNext =
- if (!overflow) {
- if (step > 0) {
- number < safePartitionEnd
- } else {
- number > safePartitionEnd
- }
- } else false
-
- override def next() = {
- val ret = number
- number += step
- if (number < ret ^ step < 0) {
- // we have Long.MaxValue + Long.MaxValue < Long.MaxValue
- // and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step
- // back, we are pretty sure that we have an overflow.
- overflow = true
- }
-
- numOutputRows += 1
- unsafeRow.setLong(0, ret)
- unsafeRow
- }
- }
- }
- }
-}
-
-/**
- * Physical plan for unioning two plans, without a distinct. This is UNION ALL in SQL.
- */
-case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
- override def output: Seq[Attribute] =
- children.map(_.output).transpose.map(attrs =>
- attrs.head.withNullability(attrs.exists(_.nullable)))
-
- protected override def doExecute(): RDD[InternalRow] =
- sparkContext.union(children.map(_.execute()))
-}
-
-/**
- * Physical plan for returning a new RDD that has exactly `numPartitions` partitions.
- * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
- * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
- * the 100 new partitions will claim 10 of the current partitions.
- */
-case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode {
- override def output: Seq[Attribute] = child.output
-
- override def outputPartitioning: Partitioning = {
- if (numPartitions == 1) SinglePartition
- else UnknownPartitioning(numPartitions)
- }
-
- protected override def doExecute(): RDD[InternalRow] = {
- child.execute().coalesce(numPartitions, shuffle = false)
- }
-}
-
-/**
- * Physical plan for returning a table with the elements from left that are not in right using
- * the built-in spark subtract function.
- */
-case class ExceptExec(left: SparkPlan, right: SparkPlan) extends BinaryExecNode {
- override def output: Seq[Attribute] = left.output
-
- protected override def doExecute(): RDD[InternalRow] = {
- left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
- }
-}
-
-/**
- * A plan node that does nothing but lie about the output of its child. Used to spice a
- * (hopefully structurally equivalent) tree from a different optimization sequence into an already
- * resolved tree.
- */
-case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
- def children: Seq[SparkPlan] = child :: Nil
-
- protected override def doExecute(): RDD[InternalRow] = child.execute()
-}
-
-/**
- * Physical plan for a subquery.
- *
- * This is used to generate tree string for SparkScalarSubquery.
- */
-case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
- override def output: Seq[Attribute] = child.output
- override def outputPartitioning: Partitioning = child.outputPartitioning
- override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
- protected override def doExecute(): RDD[InternalRow] = {
- throw new UnsupportedOperationException
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org