You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/30 21:05:48 UTC
spark git commit: [SPARK-24865][SQL] Remove AnalysisBarrier addendum
Repository: spark
Updated Branches:
refs/heads/master d6b7545b5 -> abbb4ab4d
[SPARK-24865][SQL] Remove AnalysisBarrier addendum
## What changes were proposed in this pull request?
I didn't want to pollute the diff in the previous PR and left some TODOs. This is a follow-up to address those TODOs.
## How was this patch tested?
Should be covered by existing tests.
Author: Reynold Xin <rx...@databricks.com>
Closes #21896 from rxin/SPARK-24865-addendum.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abbb4ab4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abbb4ab4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abbb4ab4
Branch: refs/heads/master
Commit: abbb4ab4d8b12ba2d94b16407c0d62ae207ee4fa
Parents: d6b7545
Author: Reynold Xin <rx...@databricks.com>
Authored: Mon Jul 30 14:05:45 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Mon Jul 30 14:05:45 2018 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/analysis/Analyzer.scala | 1 -
.../scala/org/apache/spark/sql/Dataset.scala | 88 ++++++++++----------
.../spark/sql/RelationalGroupedDataset.scala | 13 ++-
.../continuous/ContinuousAggregationSuite.scala | 4 +-
4 files changed, 50 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/abbb4ab4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 9965cd6..1488ede 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -799,7 +799,6 @@ class Analyzer(
right
case Some((oldRelation, newRelation)) =>
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
- // TODO(rxin): Why do we need transformUp here?
right transformUp {
case r if r == oldRelation => newRelation
} transformUp {
http://git-wip-us.apache.org/repos/asf/spark/blob/abbb4ab4/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index d36c8d1..3b0a6d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -195,10 +195,6 @@ class Dataset[T] private[sql](
}
}
- // Wraps analyzed logical plans with an analysis barrier so we won't traverse/resolve it again.
- // TODO(rxin): remove this later.
- @transient private[sql] val planWithBarrier = logicalPlan
-
/**
* Currently [[ExpressionEncoder]] is the only implementation of [[Encoder]], here we turn the
* passed in encoder to [[ExpressionEncoder]] explicitly, and mark it implicit so that we can use
@@ -427,7 +423,7 @@ class Dataset[T] private[sql](
*/
@Experimental
@InterfaceStability.Evolving
- def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, planWithBarrier)
+ def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, logicalPlan)
/**
* Converts this strongly typed collection of data to generic `DataFrame` with columns renamed.
@@ -681,7 +677,7 @@ class Dataset[T] private[sql](
require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
s"delay threshold ($delayThreshold) should not be negative.")
EliminateEventTimeWatermark(
- EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, planWithBarrier))
+ EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan))
}
/**
@@ -854,7 +850,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def join(right: Dataset[_]): DataFrame = withPlan {
- Join(planWithBarrier, right.planWithBarrier, joinType = Inner, None)
+ Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
}
/**
@@ -932,7 +928,7 @@ class Dataset[T] private[sql](
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
val joined = sparkSession.sessionState.executePlan(
- Join(planWithBarrier, right.planWithBarrier, joinType = JoinType(joinType), None))
+ Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None))
.analyzed.asInstanceOf[Join]
withPlan {
@@ -993,7 +989,7 @@ class Dataset[T] private[sql](
// Trigger analysis so in the case of self-join, the analyzer will clone the plan.
// After the cloning, left and right side will have distinct expression ids.
val plan = withPlan(
- Join(planWithBarrier, right.planWithBarrier, JoinType(joinType), Some(joinExprs.expr)))
+ Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)))
.queryExecution.analyzed.asInstanceOf[Join]
// If auto self join alias is disabled, return the plan.
@@ -1002,8 +998,8 @@ class Dataset[T] private[sql](
}
// If left/right have no output set intersection, return the plan.
- val lanalyzed = withPlan(this.planWithBarrier).queryExecution.analyzed
- val ranalyzed = withPlan(right.planWithBarrier).queryExecution.analyzed
+ val lanalyzed = withPlan(this.logicalPlan).queryExecution.analyzed
+ val ranalyzed = withPlan(right.logicalPlan).queryExecution.analyzed
if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
return withPlan(plan)
}
@@ -1040,7 +1036,7 @@ class Dataset[T] private[sql](
* @since 2.1.0
*/
def crossJoin(right: Dataset[_]): DataFrame = withPlan {
- Join(planWithBarrier, right.planWithBarrier, joinType = Cross, None)
+ Join(logicalPlan, right.logicalPlan, joinType = Cross, None)
}
/**
@@ -1072,8 +1068,8 @@ class Dataset[T] private[sql](
// etc.
val joined = sparkSession.sessionState.executePlan(
Join(
- this.planWithBarrier,
- other.planWithBarrier,
+ this.logicalPlan,
+ other.logicalPlan,
JoinType(joinType),
Some(condition.expr))).analyzed.asInstanceOf[Join]
@@ -1294,7 +1290,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def as(alias: String): Dataset[T] = withTypedPlan {
- SubqueryAlias(alias, planWithBarrier)
+ SubqueryAlias(alias, logicalPlan)
}
/**
@@ -1332,7 +1328,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def select(cols: Column*): DataFrame = withPlan {
- Project(cols.map(_.named), planWithBarrier)
+ Project(cols.map(_.named), logicalPlan)
}
/**
@@ -1387,8 +1383,8 @@ class Dataset[T] private[sql](
@InterfaceStability.Evolving
def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = {
implicit val encoder = c1.encoder
- val project = Project(c1.withInputType(exprEnc, planWithBarrier.output).named :: Nil,
- planWithBarrier)
+ val project = Project(c1.withInputType(exprEnc, logicalPlan.output).named :: Nil,
+ logicalPlan)
if (encoder.flat) {
new Dataset[U1](sparkSession, project, encoder)
@@ -1406,8 +1402,8 @@ class Dataset[T] private[sql](
protected def selectUntyped(columns: TypedColumn[_, _]*): Dataset[_] = {
val encoders = columns.map(_.encoder)
val namedColumns =
- columns.map(_.withInputType(exprEnc, planWithBarrier.output).named)
- val execution = new QueryExecution(sparkSession, Project(namedColumns, planWithBarrier))
+ columns.map(_.withInputType(exprEnc, logicalPlan.output).named)
+ val execution = new QueryExecution(sparkSession, Project(namedColumns, logicalPlan))
new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders))
}
@@ -1483,7 +1479,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def filter(condition: Column): Dataset[T] = withTypedPlan {
- Filter(condition.expr, planWithBarrier)
+ Filter(condition.expr, logicalPlan)
}
/**
@@ -1662,7 +1658,7 @@ class Dataset[T] private[sql](
@Experimental
@InterfaceStability.Evolving
def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
- val inputPlan = planWithBarrier
+ val inputPlan = logicalPlan
val withGroupingKey = AppendColumns(func, inputPlan)
val executed = sparkSession.sessionState.executePlan(withGroupingKey)
@@ -1808,7 +1804,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def limit(n: Int): Dataset[T] = withTypedPlan {
- Limit(Literal(n), planWithBarrier)
+ Limit(Literal(n), logicalPlan)
}
/**
@@ -1931,7 +1927,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def intersect(other: Dataset[T]): Dataset[T] = withSetOperator {
- Intersect(planWithBarrier, other.planWithBarrier)
+ Intersect(logicalPlan, other.logicalPlan)
}
/**
@@ -1962,7 +1958,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def except(other: Dataset[T]): Dataset[T] = withSetOperator {
- Except(planWithBarrier, other.planWithBarrier)
+ Except(logicalPlan, other.logicalPlan)
}
/**
@@ -2029,7 +2025,7 @@ class Dataset[T] private[sql](
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] = {
withTypedPlan {
- Sample(0.0, fraction, withReplacement, seed, planWithBarrier)
+ Sample(0.0, fraction, withReplacement, seed, logicalPlan)
}
}
@@ -2071,15 +2067,15 @@ class Dataset[T] private[sql](
// overlapping splits. To prevent this, we explicitly sort each input partition to make the
// ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
// from the sort order.
- val sortOrder = planWithBarrier.output
+ val sortOrder = logicalPlan.output
.filter(attr => RowOrdering.isOrderable(attr.dataType))
.map(SortOrder(_, Ascending))
val plan = if (sortOrder.nonEmpty) {
- Sort(sortOrder, global = false, planWithBarrier)
+ Sort(sortOrder, global = false, logicalPlan)
} else {
// SPARK-12662: If sort order is empty, we materialize the dataset to guarantee determinism
cache()
- planWithBarrier
+ logicalPlan
}
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
@@ -2163,7 +2159,7 @@ class Dataset[T] private[sql](
withPlan {
Generate(generator, unrequiredChildIndex = Nil, outer = false,
- qualifier = None, generatorOutput = Nil, planWithBarrier)
+ qualifier = None, generatorOutput = Nil, logicalPlan)
}
}
@@ -2204,7 +2200,7 @@ class Dataset[T] private[sql](
withPlan {
Generate(generator, unrequiredChildIndex = Nil, outer = false,
- qualifier = None, generatorOutput = Nil, planWithBarrier)
+ qualifier = None, generatorOutput = Nil, logicalPlan)
}
}
@@ -2355,7 +2351,7 @@ class Dataset[T] private[sql](
u.name, sparkSession.sessionState.analyzer.resolver).getOrElse(u)
case Column(expr: Expression) => expr
}
- val attrs = this.planWithBarrier.output
+ val attrs = this.logicalPlan.output
val colsAfterDrop = attrs.filter { attr =>
attr != expression
}.map(attr => Column(attr))
@@ -2403,7 +2399,7 @@ class Dataset[T] private[sql](
}
cols
}
- Deduplicate(groupCols, planWithBarrier)
+ Deduplicate(groupCols, logicalPlan)
}
/**
@@ -2585,7 +2581,7 @@ class Dataset[T] private[sql](
@Experimental
@InterfaceStability.Evolving
def filter(func: T => Boolean): Dataset[T] = {
- withTypedPlan(TypedFilter(func, planWithBarrier))
+ withTypedPlan(TypedFilter(func, logicalPlan))
}
/**
@@ -2599,7 +2595,7 @@ class Dataset[T] private[sql](
@Experimental
@InterfaceStability.Evolving
def filter(func: FilterFunction[T]): Dataset[T] = {
- withTypedPlan(TypedFilter(func, planWithBarrier))
+ withTypedPlan(TypedFilter(func, logicalPlan))
}
/**
@@ -2613,7 +2609,7 @@ class Dataset[T] private[sql](
@Experimental
@InterfaceStability.Evolving
def map[U : Encoder](func: T => U): Dataset[U] = withTypedPlan {
- MapElements[T, U](func, planWithBarrier)
+ MapElements[T, U](func, logicalPlan)
}
/**
@@ -2628,7 +2624,7 @@ class Dataset[T] private[sql](
@InterfaceStability.Evolving
def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
implicit val uEnc = encoder
- withTypedPlan(MapElements[T, U](func, planWithBarrier))
+ withTypedPlan(MapElements[T, U](func, logicalPlan))
}
/**
@@ -2644,7 +2640,7 @@ class Dataset[T] private[sql](
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
new Dataset[U](
sparkSession,
- MapPartitions[T, U](func, planWithBarrier),
+ MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}
@@ -2675,7 +2671,7 @@ class Dataset[T] private[sql](
val rowEncoder = encoder.asInstanceOf[ExpressionEncoder[Row]]
Dataset.ofRows(
sparkSession,
- MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, planWithBarrier))
+ MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, logicalPlan))
}
/**
@@ -2839,7 +2835,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
- Repartition(numPartitions, shuffle = true, planWithBarrier)
+ Repartition(numPartitions, shuffle = true, logicalPlan)
}
/**
@@ -2862,7 +2858,7 @@ class Dataset[T] private[sql](
|For range partitioning use repartitionByRange(...) instead.
""".stripMargin)
withTypedPlan {
- RepartitionByExpression(partitionExprs.map(_.expr), planWithBarrier, numPartitions)
+ RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions)
}
}
@@ -2900,7 +2896,7 @@ class Dataset[T] private[sql](
case expr: Expression => SortOrder(expr, Ascending)
})
withTypedPlan {
- RepartitionByExpression(sortOrder, planWithBarrier, numPartitions)
+ RepartitionByExpression(sortOrder, logicalPlan, numPartitions)
}
}
@@ -2939,7 +2935,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
- Repartition(numPartitions, shuffle = false, planWithBarrier)
+ Repartition(numPartitions, shuffle = false, logicalPlan)
}
/**
@@ -3024,7 +3020,7 @@ class Dataset[T] private[sql](
// Represents the `QueryExecution` used to produce the content of the Dataset as an `RDD`.
@transient private lazy val rddQueryExecution: QueryExecution = {
- val deserialized = CatalystSerde.deserialize[T](planWithBarrier)
+ val deserialized = CatalystSerde.deserialize[T](logicalPlan)
sparkSession.sessionState.executePlan(deserialized)
}
@@ -3150,7 +3146,7 @@ class Dataset[T] private[sql](
comment = None,
properties = Map.empty,
originalText = None,
- child = planWithBarrier,
+ child = logicalPlan,
allowExisting = false,
replace = replace,
viewType = viewType)
@@ -3363,7 +3359,7 @@ class Dataset[T] private[sql](
}
}
withTypedPlan {
- Sort(sortOrder, global = global, planWithBarrier)
+ Sort(sortOrder, global = global, logicalPlan)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/abbb4ab4/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index b068493..8412219 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -62,18 +62,17 @@ class RelationalGroupedDataset protected[sql](
groupType match {
case RelationalGroupedDataset.GroupByType =>
- Dataset.ofRows(
- df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.planWithBarrier))
+ Dataset.ofRows(df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.RollupType =>
Dataset.ofRows(
- df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.planWithBarrier))
+ df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.CubeType =>
Dataset.ofRows(
- df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.planWithBarrier))
+ df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))
case RelationalGroupedDataset.PivotType(pivotCol, values) =>
val aliasedGrps = groupingExprs.map(alias)
Dataset.ofRows(
- df.sparkSession, Pivot(Some(aliasedGrps), pivotCol, values, aggExprs, df.planWithBarrier))
+ df.sparkSession, Pivot(Some(aliasedGrps), pivotCol, values, aggExprs, df.logicalPlan))
}
}
@@ -433,7 +432,7 @@ class RelationalGroupedDataset protected[sql](
df.exprEnc.schema,
groupingAttributes,
df.logicalPlan.output,
- df.planWithBarrier))
+ df.logicalPlan))
}
/**
@@ -459,7 +458,7 @@ class RelationalGroupedDataset protected[sql](
case other => Alias(other, other.toString)()
}
val groupingAttributes = groupingNamedExpressions.map(_.toAttribute)
- val child = df.planWithBarrier
+ val child = df.logicalPlan
val project = Project(groupingNamedExpressions ++ child.output, child)
val output = expr.dataType.asInstanceOf[StructType].toAttributes
val plan = FlatMapGroupsInPandas(groupingAttributes, expr, output, project)
http://git-wip-us.apache.org/repos/asf/spark/blob/abbb4ab4/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
index 0223812..c5b95fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
@@ -74,7 +74,7 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase {
val df = input.toDF()
.select('value as 'copy, 'value)
.where('copy =!= 1)
- .planWithBarrier
+ .logicalPlan
.coalesce(1)
.where('copy =!= 2)
.agg(max('value))
@@ -95,7 +95,7 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase {
val df = input.toDF()
.coalesce(1)
- .planWithBarrier
+ .logicalPlan
.coalesce(1)
.select('value as 'copy, 'value)
.agg(max('value))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org