You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/30 21:05:48 UTC

spark git commit: [SPARK-24865][SQL] Remove AnalysisBarrier addendum

Repository: spark
Updated Branches:
  refs/heads/master d6b7545b5 -> abbb4ab4d


[SPARK-24865][SQL] Remove AnalysisBarrier addendum

## What changes were proposed in this pull request?
I didn't want to pollute the diff in the previous PR and left some TODOs. This is a follow-up to address those TODOs.

## How was this patch tested?
Should be covered by existing tests.

Author: Reynold Xin <rx...@databricks.com>

Closes #21896 from rxin/SPARK-24865-addendum.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abbb4ab4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abbb4ab4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abbb4ab4

Branch: refs/heads/master
Commit: abbb4ab4d8b12ba2d94b16407c0d62ae207ee4fa
Parents: d6b7545
Author: Reynold Xin <rx...@databricks.com>
Authored: Mon Jul 30 14:05:45 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Mon Jul 30 14:05:45 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  1 -
 .../scala/org/apache/spark/sql/Dataset.scala    | 88 ++++++++++----------
 .../spark/sql/RelationalGroupedDataset.scala    | 13 ++-
 .../continuous/ContinuousAggregationSuite.scala |  4 +-
 4 files changed, 50 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/abbb4ab4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 9965cd6..1488ede 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -799,7 +799,6 @@ class Analyzer(
           right
         case Some((oldRelation, newRelation)) =>
           val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
-          // TODO(rxin): Why do we need transformUp here?
           right transformUp {
             case r if r == oldRelation => newRelation
           } transformUp {

http://git-wip-us.apache.org/repos/asf/spark/blob/abbb4ab4/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index d36c8d1..3b0a6d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -195,10 +195,6 @@ class Dataset[T] private[sql](
     }
   }
 
-  // Wraps analyzed logical plans with an analysis barrier so we won't traverse/resolve it again.
-  // TODO(rxin): remove this later.
-  @transient private[sql] val planWithBarrier = logicalPlan
-
   /**
    * Currently [[ExpressionEncoder]] is the only implementation of [[Encoder]], here we turn the
    * passed in encoder to [[ExpressionEncoder]] explicitly, and mark it implicit so that we can use
@@ -427,7 +423,7 @@ class Dataset[T] private[sql](
    */
   @Experimental
   @InterfaceStability.Evolving
-  def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, planWithBarrier)
+  def as[U : Encoder]: Dataset[U] = Dataset[U](sparkSession, logicalPlan)
 
   /**
    * Converts this strongly typed collection of data to generic `DataFrame` with columns renamed.
@@ -681,7 +677,7 @@ class Dataset[T] private[sql](
     require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
       s"delay threshold ($delayThreshold) should not be negative.")
     EliminateEventTimeWatermark(
-      EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, planWithBarrier))
+      EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan))
   }
 
   /**
@@ -854,7 +850,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def join(right: Dataset[_]): DataFrame = withPlan {
-    Join(planWithBarrier, right.planWithBarrier, joinType = Inner, None)
+    Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
   }
 
   /**
@@ -932,7 +928,7 @@ class Dataset[T] private[sql](
     // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
     // by creating a new instance for one of the branch.
     val joined = sparkSession.sessionState.executePlan(
-      Join(planWithBarrier, right.planWithBarrier, joinType = JoinType(joinType), None))
+      Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None))
       .analyzed.asInstanceOf[Join]
 
     withPlan {
@@ -993,7 +989,7 @@ class Dataset[T] private[sql](
     // Trigger analysis so in the case of self-join, the analyzer will clone the plan.
     // After the cloning, left and right side will have distinct expression ids.
     val plan = withPlan(
-      Join(planWithBarrier, right.planWithBarrier, JoinType(joinType), Some(joinExprs.expr)))
+      Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)))
       .queryExecution.analyzed.asInstanceOf[Join]
 
     // If auto self join alias is disabled, return the plan.
@@ -1002,8 +998,8 @@ class Dataset[T] private[sql](
     }
 
     // If left/right have no output set intersection, return the plan.
-    val lanalyzed = withPlan(this.planWithBarrier).queryExecution.analyzed
-    val ranalyzed = withPlan(right.planWithBarrier).queryExecution.analyzed
+    val lanalyzed = withPlan(this.logicalPlan).queryExecution.analyzed
+    val ranalyzed = withPlan(right.logicalPlan).queryExecution.analyzed
     if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
       return withPlan(plan)
     }
@@ -1040,7 +1036,7 @@ class Dataset[T] private[sql](
    * @since 2.1.0
    */
   def crossJoin(right: Dataset[_]): DataFrame = withPlan {
-    Join(planWithBarrier, right.planWithBarrier, joinType = Cross, None)
+    Join(logicalPlan, right.logicalPlan, joinType = Cross, None)
   }
 
   /**
@@ -1072,8 +1068,8 @@ class Dataset[T] private[sql](
     // etc.
     val joined = sparkSession.sessionState.executePlan(
       Join(
-        this.planWithBarrier,
-        other.planWithBarrier,
+        this.logicalPlan,
+        other.logicalPlan,
         JoinType(joinType),
         Some(condition.expr))).analyzed.asInstanceOf[Join]
 
@@ -1294,7 +1290,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def as(alias: String): Dataset[T] = withTypedPlan {
-    SubqueryAlias(alias, planWithBarrier)
+    SubqueryAlias(alias, logicalPlan)
   }
 
   /**
@@ -1332,7 +1328,7 @@ class Dataset[T] private[sql](
    */
   @scala.annotation.varargs
   def select(cols: Column*): DataFrame = withPlan {
-    Project(cols.map(_.named), planWithBarrier)
+    Project(cols.map(_.named), logicalPlan)
   }
 
   /**
@@ -1387,8 +1383,8 @@ class Dataset[T] private[sql](
   @InterfaceStability.Evolving
   def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = {
     implicit val encoder = c1.encoder
-    val project = Project(c1.withInputType(exprEnc, planWithBarrier.output).named :: Nil,
-      planWithBarrier)
+    val project = Project(c1.withInputType(exprEnc, logicalPlan.output).named :: Nil,
+      logicalPlan)
 
     if (encoder.flat) {
       new Dataset[U1](sparkSession, project, encoder)
@@ -1406,8 +1402,8 @@ class Dataset[T] private[sql](
   protected def selectUntyped(columns: TypedColumn[_, _]*): Dataset[_] = {
     val encoders = columns.map(_.encoder)
     val namedColumns =
-      columns.map(_.withInputType(exprEnc, planWithBarrier.output).named)
-    val execution = new QueryExecution(sparkSession, Project(namedColumns, planWithBarrier))
+      columns.map(_.withInputType(exprEnc, logicalPlan.output).named)
+    val execution = new QueryExecution(sparkSession, Project(namedColumns, logicalPlan))
     new Dataset(sparkSession, execution, ExpressionEncoder.tuple(encoders))
   }
 
@@ -1483,7 +1479,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def filter(condition: Column): Dataset[T] = withTypedPlan {
-    Filter(condition.expr, planWithBarrier)
+    Filter(condition.expr, logicalPlan)
   }
 
   /**
@@ -1662,7 +1658,7 @@ class Dataset[T] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
-    val inputPlan = planWithBarrier
+    val inputPlan = logicalPlan
     val withGroupingKey = AppendColumns(func, inputPlan)
     val executed = sparkSession.sessionState.executePlan(withGroupingKey)
 
@@ -1808,7 +1804,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def limit(n: Int): Dataset[T] = withTypedPlan {
-    Limit(Literal(n), planWithBarrier)
+    Limit(Literal(n), logicalPlan)
   }
 
   /**
@@ -1931,7 +1927,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def intersect(other: Dataset[T]): Dataset[T] = withSetOperator {
-    Intersect(planWithBarrier, other.planWithBarrier)
+    Intersect(logicalPlan, other.logicalPlan)
   }
 
   /**
@@ -1962,7 +1958,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def except(other: Dataset[T]): Dataset[T] = withSetOperator {
-    Except(planWithBarrier, other.planWithBarrier)
+    Except(logicalPlan, other.logicalPlan)
   }
 
   /**
@@ -2029,7 +2025,7 @@ class Dataset[T] private[sql](
    */
   def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] = {
     withTypedPlan {
-      Sample(0.0, fraction, withReplacement, seed, planWithBarrier)
+      Sample(0.0, fraction, withReplacement, seed, logicalPlan)
     }
   }
 
@@ -2071,15 +2067,15 @@ class Dataset[T] private[sql](
     // overlapping splits. To prevent this, we explicitly sort each input partition to make the
     // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
     // from the sort order.
-    val sortOrder = planWithBarrier.output
+    val sortOrder = logicalPlan.output
       .filter(attr => RowOrdering.isOrderable(attr.dataType))
       .map(SortOrder(_, Ascending))
     val plan = if (sortOrder.nonEmpty) {
-      Sort(sortOrder, global = false, planWithBarrier)
+      Sort(sortOrder, global = false, logicalPlan)
     } else {
       // SPARK-12662: If sort order is empty, we materialize the dataset to guarantee determinism
       cache()
-      planWithBarrier
+      logicalPlan
     }
     val sum = weights.sum
     val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
@@ -2163,7 +2159,7 @@ class Dataset[T] private[sql](
 
     withPlan {
       Generate(generator, unrequiredChildIndex = Nil, outer = false,
-        qualifier = None, generatorOutput = Nil, planWithBarrier)
+        qualifier = None, generatorOutput = Nil, logicalPlan)
     }
   }
 
@@ -2204,7 +2200,7 @@ class Dataset[T] private[sql](
 
     withPlan {
       Generate(generator, unrequiredChildIndex = Nil, outer = false,
-        qualifier = None, generatorOutput = Nil, planWithBarrier)
+        qualifier = None, generatorOutput = Nil, logicalPlan)
     }
   }
 
@@ -2355,7 +2351,7 @@ class Dataset[T] private[sql](
           u.name, sparkSession.sessionState.analyzer.resolver).getOrElse(u)
       case Column(expr: Expression) => expr
     }
-    val attrs = this.planWithBarrier.output
+    val attrs = this.logicalPlan.output
     val colsAfterDrop = attrs.filter { attr =>
       attr != expression
     }.map(attr => Column(attr))
@@ -2403,7 +2399,7 @@ class Dataset[T] private[sql](
       }
       cols
     }
-    Deduplicate(groupCols, planWithBarrier)
+    Deduplicate(groupCols, logicalPlan)
   }
 
   /**
@@ -2585,7 +2581,7 @@ class Dataset[T] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def filter(func: T => Boolean): Dataset[T] = {
-    withTypedPlan(TypedFilter(func, planWithBarrier))
+    withTypedPlan(TypedFilter(func, logicalPlan))
   }
 
   /**
@@ -2599,7 +2595,7 @@ class Dataset[T] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def filter(func: FilterFunction[T]): Dataset[T] = {
-    withTypedPlan(TypedFilter(func, planWithBarrier))
+    withTypedPlan(TypedFilter(func, logicalPlan))
   }
 
   /**
@@ -2613,7 +2609,7 @@ class Dataset[T] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def map[U : Encoder](func: T => U): Dataset[U] = withTypedPlan {
-    MapElements[T, U](func, planWithBarrier)
+    MapElements[T, U](func, logicalPlan)
   }
 
   /**
@@ -2628,7 +2624,7 @@ class Dataset[T] private[sql](
   @InterfaceStability.Evolving
   def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
     implicit val uEnc = encoder
-    withTypedPlan(MapElements[T, U](func, planWithBarrier))
+    withTypedPlan(MapElements[T, U](func, logicalPlan))
   }
 
   /**
@@ -2644,7 +2640,7 @@ class Dataset[T] private[sql](
   def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
     new Dataset[U](
       sparkSession,
-      MapPartitions[T, U](func, planWithBarrier),
+      MapPartitions[T, U](func, logicalPlan),
       implicitly[Encoder[U]])
   }
 
@@ -2675,7 +2671,7 @@ class Dataset[T] private[sql](
     val rowEncoder = encoder.asInstanceOf[ExpressionEncoder[Row]]
     Dataset.ofRows(
       sparkSession,
-      MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, planWithBarrier))
+      MapPartitionsInR(func, packageNames, broadcastVars, schema, rowEncoder, logicalPlan))
   }
 
   /**
@@ -2839,7 +2835,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
-    Repartition(numPartitions, shuffle = true, planWithBarrier)
+    Repartition(numPartitions, shuffle = true, logicalPlan)
   }
 
   /**
@@ -2862,7 +2858,7 @@ class Dataset[T] private[sql](
          |For range partitioning use repartitionByRange(...) instead.
        """.stripMargin)
     withTypedPlan {
-      RepartitionByExpression(partitionExprs.map(_.expr), planWithBarrier, numPartitions)
+      RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions)
     }
   }
 
@@ -2900,7 +2896,7 @@ class Dataset[T] private[sql](
       case expr: Expression => SortOrder(expr, Ascending)
     })
     withTypedPlan {
-      RepartitionByExpression(sortOrder, planWithBarrier, numPartitions)
+      RepartitionByExpression(sortOrder, logicalPlan, numPartitions)
     }
   }
 
@@ -2939,7 +2935,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
-    Repartition(numPartitions, shuffle = false, planWithBarrier)
+    Repartition(numPartitions, shuffle = false, logicalPlan)
   }
 
   /**
@@ -3024,7 +3020,7 @@ class Dataset[T] private[sql](
 
   // Represents the `QueryExecution` used to produce the content of the Dataset as an `RDD`.
   @transient private lazy val rddQueryExecution: QueryExecution = {
-    val deserialized = CatalystSerde.deserialize[T](planWithBarrier)
+    val deserialized = CatalystSerde.deserialize[T](logicalPlan)
     sparkSession.sessionState.executePlan(deserialized)
   }
 
@@ -3150,7 +3146,7 @@ class Dataset[T] private[sql](
       comment = None,
       properties = Map.empty,
       originalText = None,
-      child = planWithBarrier,
+      child = logicalPlan,
       allowExisting = false,
       replace = replace,
       viewType = viewType)
@@ -3363,7 +3359,7 @@ class Dataset[T] private[sql](
       }
     }
     withTypedPlan {
-      Sort(sortOrder, global = global, planWithBarrier)
+      Sort(sortOrder, global = global, logicalPlan)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/abbb4ab4/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index b068493..8412219 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -62,18 +62,17 @@ class RelationalGroupedDataset protected[sql](
 
     groupType match {
       case RelationalGroupedDataset.GroupByType =>
-        Dataset.ofRows(
-          df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.planWithBarrier))
+        Dataset.ofRows(df.sparkSession, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
       case RelationalGroupedDataset.RollupType =>
         Dataset.ofRows(
-          df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.planWithBarrier))
+          df.sparkSession, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))
       case RelationalGroupedDataset.CubeType =>
         Dataset.ofRows(
-          df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.planWithBarrier))
+          df.sparkSession, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))
       case RelationalGroupedDataset.PivotType(pivotCol, values) =>
         val aliasedGrps = groupingExprs.map(alias)
         Dataset.ofRows(
-          df.sparkSession, Pivot(Some(aliasedGrps), pivotCol, values, aggExprs, df.planWithBarrier))
+          df.sparkSession, Pivot(Some(aliasedGrps), pivotCol, values, aggExprs, df.logicalPlan))
     }
   }
 
@@ -433,7 +432,7 @@ class RelationalGroupedDataset protected[sql](
           df.exprEnc.schema,
           groupingAttributes,
           df.logicalPlan.output,
-          df.planWithBarrier))
+          df.logicalPlan))
   }
 
   /**
@@ -459,7 +458,7 @@ class RelationalGroupedDataset protected[sql](
       case other => Alias(other, other.toString)()
     }
     val groupingAttributes = groupingNamedExpressions.map(_.toAttribute)
-    val child = df.planWithBarrier
+    val child = df.logicalPlan
     val project = Project(groupingNamedExpressions ++ child.output, child)
     val output = expr.dataType.asInstanceOf[StructType].toAttributes
     val plan = FlatMapGroupsInPandas(groupingAttributes, expr, output, project)

http://git-wip-us.apache.org/repos/asf/spark/blob/abbb4ab4/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
index 0223812..c5b95fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
@@ -74,7 +74,7 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase {
     val df = input.toDF()
       .select('value as 'copy, 'value)
       .where('copy =!= 1)
-      .planWithBarrier
+      .logicalPlan
       .coalesce(1)
       .where('copy =!= 2)
       .agg(max('value))
@@ -95,7 +95,7 @@ class ContinuousAggregationSuite extends ContinuousSuiteBase {
 
     val df = input.toDF()
       .coalesce(1)
-      .planWithBarrier
+      .logicalPlan
       .coalesce(1)
       .select('value as 'copy, 'value)
       .agg(max('value))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org