You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/07/01 04:10:28 UTC

spark git commit: [SPARK-21273][SQL] Propagate logical plan stats using visitor pattern and mixin

Repository: spark
Updated Branches:
  refs/heads/master 61b5df567 -> b1d719e7c


[SPARK-21273][SQL] Propagate logical plan stats using visitor pattern and mixin

## What changes were proposed in this pull request?
We currently implement statistics propagation directly in logical plan. Given we already have two different implementations, it'd make sense to actually decouple the two and add stats propagation using mixin. This would reduce the coupling between logical plan and statistics handling.

This can also be a powerful pattern in the future to add additional properties (e.g. constraints).

## How was this patch tested?
Should be covered by existing test cases.

Author: Reynold Xin <rx...@databricks.com>

Closes #18479 from rxin/stats-trait.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1d719e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1d719e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1d719e7

Branch: refs/heads/master
Commit: b1d719e7c9faeb5661a7e712b3ecefca56bf356f
Parents: 61b5df5
Author: Reynold Xin <rx...@databricks.com>
Authored: Fri Jun 30 21:10:23 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Jun 30 21:10:23 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |   2 +-
 .../catalyst/plans/logical/LocalRelation.scala  |   5 +-
 .../catalyst/plans/logical/LogicalPlan.scala    |  61 +------
 .../plans/logical/LogicalPlanVisitor.scala      |  87 ++++++++++
 .../plans/logical/basicLogicalOperators.scala   | 128 +--------------
 .../sql/catalyst/plans/logical/hints.scala      |   5 -
 .../statsEstimation/BasicStatsPlanVisitor.scala |  82 ++++++++++
 .../statsEstimation/LogicalPlanStats.scala      |  50 ++++++
 .../SizeInBytesOnlyStatsPlanVisitor.scala       | 163 +++++++++++++++++++
 .../BasicStatsEstimationSuite.scala             |  44 -----
 .../StatsEstimationTestBase.scala               |   2 +-
 .../spark/sql/execution/ExistingRDD.scala       |   4 +-
 .../execution/columnar/InMemoryRelation.scala   |   2 +-
 .../execution/datasources/LogicalRelation.scala |   7 +-
 .../spark/sql/execution/streaming/memory.scala  |   3 +-
 .../PruneFileSourcePartitionsSuite.scala        |   2 +-
 16 files changed, 409 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index da50b0e..9531456 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -438,7 +438,7 @@ case class CatalogRelation(
       case (attr, index) => attr.withExprId(ExprId(index + dataCols.length))
     })
 
-  override def computeStats: Statistics = {
+  override def computeStats(): Statistics = {
     // For data source tables, we will create a `LogicalRelation` and won't call this method, for
     // hive serde tables, we will always generate a statistics.
     // TODO: unify the table stats generation.

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index dc2add6..1c986fb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -66,9 +66,8 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
     }
   }
 
-  override def computeStats: Statistics =
-    Statistics(sizeInBytes =
-      output.map(n => BigInt(n.dataType.defaultSize)).sum * data.length)
+  override def computeStats(): Statistics =
+    Statistics(sizeInBytes = output.map(n => BigInt(n.dataType.defaultSize)).sum * data.length)
 
   def toSQL(inlineTableName: String): String = {
     require(data.nonEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 0d30aa7..8649603 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -22,11 +22,16 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.types.StructType
 
 
-abstract class LogicalPlan extends QueryPlan[LogicalPlan] with QueryPlanConstraints with Logging {
+abstract class LogicalPlan
+  extends QueryPlan[LogicalPlan]
+  with LogicalPlanStats
+  with QueryPlanConstraints
+  with Logging {
 
   private var _analyzed: Boolean = false
 
@@ -80,40 +85,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with QueryPlanConstrai
     }
   }
 
-  /** A cache for the estimated statistics, such that it will only be computed once. */
-  private var statsCache: Option[Statistics] = None
-
-  /**
-   * Returns the estimated statistics for the current logical plan node. Under the hood, this
-   * method caches the return value, which is computed based on the configuration passed in the
-   * first time. If the configuration changes, the cache can be invalidated by calling
-   * [[invalidateStatsCache()]].
-   */
-  final def stats: Statistics = statsCache.getOrElse {
-    statsCache = Some(computeStats)
-    statsCache.get
-  }
-
-  /** Invalidates the stats cache. See [[stats]] for more information. */
-  final def invalidateStatsCache(): Unit = {
-    statsCache = None
-    children.foreach(_.invalidateStatsCache())
-  }
-
-  /**
-   * Computes [[Statistics]] for this plan. The default implementation assumes the output
-   * cardinality is the product of all child plan's cardinality, i.e. applies in the case
-   * of cartesian joins.
-   *
-   * [[LeafNode]]s must override this.
-   */
-  protected def computeStats: Statistics = {
-    if (children.isEmpty) {
-      throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
-    }
-    Statistics(sizeInBytes = children.map(_.stats.sizeInBytes).product)
-  }
-
   override def verboseStringWithSuffix: String = {
     super.verboseString + statsCache.map(", " + _.toString).getOrElse("")
   }
@@ -300,6 +271,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with QueryPlanConstrai
 abstract class LeafNode extends LogicalPlan {
   override final def children: Seq[LogicalPlan] = Nil
   override def producedAttributes: AttributeSet = outputSet
+
+  /** Leaf nodes that can survive analysis must define their own statistics. */
+  def computeStats(): Statistics = throw new UnsupportedOperationException
 }
 
 /**
@@ -331,23 +305,6 @@ abstract class UnaryNode extends LogicalPlan {
   }
 
   override protected def validConstraints: Set[Expression] = child.constraints
-
-  override def computeStats: Statistics = {
-    // There should be some overhead in Row object, the size should not be zero when there is
-    // no columns, this help to prevent divide-by-zero error.
-    val childRowSize = child.output.map(_.dataType.defaultSize).sum + 8
-    val outputRowSize = output.map(_.dataType.defaultSize).sum + 8
-    // Assume there will be the same number of rows as child has.
-    var sizeInBytes = (child.stats.sizeInBytes * outputRowSize) / childRowSize
-    if (sizeInBytes == 0) {
-      // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
-      // (product of children).
-      sizeInBytes = 1
-    }
-
-    // Don't propagate rowCount and attributeStats, since they are not estimated here.
-    Statistics(sizeInBytes = sizeInBytes, hints = child.stats.hints)
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala
new file mode 100644
index 0000000..b230458
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanVisitor.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+/**
+ * A visitor pattern for traversing a [[LogicalPlan]] tree and compute some properties.
+ */
+trait LogicalPlanVisitor[T] {
+
+  def visit(p: LogicalPlan): T = p match {
+    case p: Aggregate => visitAggregate(p)
+    case p: Distinct => visitDistinct(p)
+    case p: Except => visitExcept(p)
+    case p: Expand => visitExpand(p)
+    case p: Filter => visitFilter(p)
+    case p: Generate => visitGenerate(p)
+    case p: GlobalLimit => visitGlobalLimit(p)
+    case p: Intersect => visitIntersect(p)
+    case p: Join => visitJoin(p)
+    case p: LocalLimit => visitLocalLimit(p)
+    case p: Pivot => visitPivot(p)
+    case p: Project => visitProject(p)
+    case p: Range => visitRange(p)
+    case p: Repartition => visitRepartition(p)
+    case p: RepartitionByExpression => visitRepartitionByExpr(p)
+    case p: Sample => visitSample(p)
+    case p: ScriptTransformation => visitScriptTransform(p)
+    case p: Union => visitUnion(p)
+    case p: ResolvedHint => visitHint(p)
+    case p: LogicalPlan => default(p)
+  }
+
+  def default(p: LogicalPlan): T
+
+  def visitAggregate(p: Aggregate): T
+
+  def visitDistinct(p: Distinct): T
+
+  def visitExcept(p: Except): T
+
+  def visitExpand(p: Expand): T
+
+  def visitFilter(p: Filter): T
+
+  def visitGenerate(p: Generate): T
+
+  def visitGlobalLimit(p: GlobalLimit): T
+
+  def visitHint(p: ResolvedHint): T
+
+  def visitIntersect(p: Intersect): T
+
+  def visitJoin(p: Join): T
+
+  def visitLocalLimit(p: LocalLimit): T
+
+  def visitPivot(p: Pivot): T
+
+  def visitProject(p: Project): T
+
+  def visitRange(p: Range): T
+
+  def visitRepartition(p: Repartition): T
+
+  def visitRepartitionByExpr(p: RepartitionByExpression): T
+
+  def visitSample(p: Sample): T
+
+  def visitScriptTransform(p: ScriptTransformation): T
+
+  def visitUnion(p: Union): T
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index e89caab..0bd3166 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -63,14 +63,6 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend
 
   override def validConstraints: Set[Expression] =
     child.constraints.union(getAliasedConstraints(projectList))
-
-  override def computeStats: Statistics = {
-    if (conf.cboEnabled) {
-      ProjectEstimation.estimate(this).getOrElse(super.computeStats)
-    } else {
-      super.computeStats
-    }
-  }
 }
 
 /**
@@ -137,14 +129,6 @@ case class Filter(condition: Expression, child: LogicalPlan)
       .filterNot(SubqueryExpression.hasCorrelatedSubquery)
     child.constraints.union(predicates.toSet)
   }
-
-  override def computeStats: Statistics = {
-    if (conf.cboEnabled) {
-      FilterEstimation(this).estimate.getOrElse(super.computeStats)
-    } else {
-      super.computeStats
-    }
-  }
 }
 
 abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
@@ -190,15 +174,6 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation
       Some(children.flatMap(_.maxRows).min)
     }
   }
-
-  override def computeStats: Statistics = {
-    val leftSize = left.stats.sizeInBytes
-    val rightSize = right.stats.sizeInBytes
-    val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize
-    Statistics(
-      sizeInBytes = sizeInBytes,
-      hints = left.stats.hints.resetForJoin())
-  }
 }
 
 case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
@@ -207,10 +182,6 @@ case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(le
   override def output: Seq[Attribute] = left.output
 
   override protected def validConstraints: Set[Expression] = leftConstraints
-
-  override def computeStats: Statistics = {
-    left.stats.copy()
-  }
 }
 
 /** Factory for constructing new `Union` nodes. */
@@ -247,11 +218,6 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
     children.length > 1 && childrenResolved && allChildrenCompatible
   }
 
-  override def computeStats: Statistics = {
-    val sizeInBytes = children.map(_.stats.sizeInBytes).sum
-    Statistics(sizeInBytes = sizeInBytes)
-  }
-
   /**
    * Maps the constraints containing a given (original) sequence of attributes to those with a
    * given (reference) sequence of attributes. Given the nature of union, we expect that the
@@ -355,25 +321,6 @@ case class Join(
     case UsingJoin(_, _) => false
     case _ => resolvedExceptNatural
   }
-
-  override def computeStats: Statistics = {
-    def simpleEstimation: Statistics = joinType match {
-      case LeftAnti | LeftSemi =>
-        // LeftSemi and LeftAnti won't ever be bigger than left
-        left.stats
-      case _ =>
-        // Make sure we don't propagate isBroadcastable in other joins, because
-        // they could explode the size.
-        val stats = super.computeStats
-        stats.copy(hints = stats.hints.resetForJoin())
-    }
-
-    if (conf.cboEnabled) {
-      JoinEstimation.estimate(this).getOrElse(simpleEstimation)
-    } else {
-      simpleEstimation
-    }
-  }
 }
 
 /**
@@ -522,14 +469,13 @@ case class Range(
 
   override def newInstance(): Range = copy(output = output.map(_.newInstance()))
 
-  override def computeStats: Statistics = {
-    val sizeInBytes = LongType.defaultSize * numElements
-    Statistics( sizeInBytes = sizeInBytes )
-  }
-
   override def simpleString: String = {
     s"Range ($start, $end, step=$step, splits=$numSlices)"
   }
+
+  override def computeStats(): Statistics = {
+    Statistics(sizeInBytes = LongType.defaultSize * numElements)
+  }
 }
 
 case class Aggregate(
@@ -554,25 +500,6 @@ case class Aggregate(
     val nonAgg = aggregateExpressions.filter(_.find(_.isInstanceOf[AggregateExpression]).isEmpty)
     child.constraints.union(getAliasedConstraints(nonAgg))
   }
-
-  override def computeStats: Statistics = {
-    def simpleEstimation: Statistics = {
-      if (groupingExpressions.isEmpty) {
-        Statistics(
-          sizeInBytes = EstimationUtils.getOutputSize(output, outputRowCount = 1),
-          rowCount = Some(1),
-          hints = child.stats.hints)
-      } else {
-        super.computeStats
-      }
-    }
-
-    if (conf.cboEnabled) {
-      AggregateEstimation.estimate(this).getOrElse(simpleEstimation)
-    } else {
-      simpleEstimation
-    }
-  }
 }
 
 case class Window(
@@ -671,11 +598,6 @@ case class Expand(
   override def references: AttributeSet =
     AttributeSet(projections.flatten.flatMap(_.references))
 
-  override def computeStats: Statistics = {
-    val sizeInBytes = super.computeStats.sizeInBytes * projections.length
-    Statistics(sizeInBytes = sizeInBytes)
-  }
-
   // This operator can reuse attributes (for example making them null when doing a roll up) so
   // the constraints of the child may no longer be valid.
   override protected def validConstraints: Set[Expression] = Set.empty[Expression]
@@ -742,16 +664,6 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN
       case _ => None
     }
   }
-  override def computeStats: Statistics = {
-    val limit = limitExpr.eval().asInstanceOf[Int]
-    val childStats = child.stats
-    val rowCount: BigInt = childStats.rowCount.map(_.min(limit)).getOrElse(limit)
-    // Don't propagate column stats, because we don't know the distribution after a limit operation
-    Statistics(
-      sizeInBytes = EstimationUtils.getOutputSize(output, rowCount, childStats.attributeStats),
-      rowCount = Some(rowCount),
-      hints = childStats.hints)
-  }
 }
 
 case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
@@ -762,24 +674,6 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
       case _ => None
     }
   }
-  override def computeStats: Statistics = {
-    val limit = limitExpr.eval().asInstanceOf[Int]
-    val childStats = child.stats
-    if (limit == 0) {
-      // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
-      // (product of children).
-      Statistics(
-        sizeInBytes = 1,
-        rowCount = Some(0),
-        hints = childStats.hints)
-    } else {
-      // The output row count of LocalLimit should be the sum of row counts from each partition.
-      // However, since the number of partitions is not available here, we just use statistics of
-      // the child. Because the distribution after a limit operation is unknown, we do not propagate
-      // the column stats.
-      childStats.copy(attributeStats = AttributeMap(Nil))
-    }
-  }
 }
 
 /**
@@ -828,18 +722,6 @@ case class Sample(
   }
 
   override def output: Seq[Attribute] = child.output
-
-  override def computeStats: Statistics = {
-    val ratio = upperBound - lowerBound
-    val childStats = child.stats
-    var sizeInBytes = EstimationUtils.ceil(BigDecimal(childStats.sizeInBytes) * ratio)
-    if (sizeInBytes == 0) {
-      sizeInBytes = 1
-    }
-    val sampledRowCount = childStats.rowCount.map(c => EstimationUtils.ceil(BigDecimal(c) * ratio))
-    // Don't propagate column stats, because we don't know the distribution after a sample operation
-    Statistics(sizeInBytes, sampledRowCount, hints = childStats.hints)
-  }
 }
 
 /**
@@ -893,7 +775,7 @@ case class RepartitionByExpression(
 case object OneRowRelation extends LeafNode {
   override def maxRows: Option[Long] = Some(1)
   override def output: Seq[Attribute] = Nil
-  override def computeStats: Statistics = Statistics(sizeInBytes = 1)
+  override def computeStats(): Statistics = Statistics(sizeInBytes = 1)
 }
 
 /** A logical plan for `dropDuplicates`. */

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
index 8479c70..29a4352 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
@@ -42,11 +42,6 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo())
   override def output: Seq[Attribute] = child.output
 
   override lazy val canonicalized: LogicalPlan = child.canonicalized
-
-  override def computeStats: Statistics = {
-    val stats = child.stats
-    stats.copy(hints = hints)
-  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala
new file mode 100644
index 0000000..93908b0
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/BasicStatsPlanVisitor.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.types.LongType
+
+/**
+ * An [[LogicalPlanVisitor]] that computes a the statistics used in a cost-based optimizer.
+ */
+object BasicStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {
+
+  /** Falls back to the estimation computed by [[SizeInBytesOnlyStatsPlanVisitor]]. */
+  private def fallback(p: LogicalPlan): Statistics = SizeInBytesOnlyStatsPlanVisitor.visit(p)
+
+  override def default(p: LogicalPlan): Statistics = fallback(p)
+
+  override def visitAggregate(p: Aggregate): Statistics = {
+    AggregateEstimation.estimate(p).getOrElse(fallback(p))
+  }
+
+  override def visitDistinct(p: Distinct): Statistics = fallback(p)
+
+  override def visitExcept(p: Except): Statistics = fallback(p)
+
+  override def visitExpand(p: Expand): Statistics = fallback(p)
+
+  override def visitFilter(p: Filter): Statistics = {
+    FilterEstimation(p).estimate.getOrElse(fallback(p))
+  }
+
+  override def visitGenerate(p: Generate): Statistics = fallback(p)
+
+  override def visitGlobalLimit(p: GlobalLimit): Statistics = fallback(p)
+
+  override def visitHint(p: ResolvedHint): Statistics = fallback(p)
+
+  override def visitIntersect(p: Intersect): Statistics = fallback(p)
+
+  override def visitJoin(p: Join): Statistics = {
+    JoinEstimation.estimate(p).getOrElse(fallback(p))
+  }
+
+  override def visitLocalLimit(p: LocalLimit): Statistics = fallback(p)
+
+  override def visitPivot(p: Pivot): Statistics = fallback(p)
+
+  override def visitProject(p: Project): Statistics = {
+    ProjectEstimation.estimate(p).getOrElse(fallback(p))
+  }
+
+  override def visitRange(p: logical.Range): Statistics = {
+    val sizeInBytes = LongType.defaultSize * p.numElements
+    Statistics(sizeInBytes = sizeInBytes)
+  }
+
+  override def visitRepartition(p: Repartition): Statistics = fallback(p)
+
+  override def visitRepartitionByExpr(p: RepartitionByExpression): Statistics = fallback(p)
+
+  override def visitSample(p: Sample): Statistics = fallback(p)
+
+  override def visitScriptTransform(p: ScriptTransformation): Statistics = fallback(p)
+
+  override def visitUnion(p: Union): Statistics = fallback(p)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/LogicalPlanStats.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/LogicalPlanStats.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/LogicalPlanStats.scala
new file mode 100644
index 0000000..8660d93
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/LogicalPlanStats.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import org.apache.spark.sql.catalyst.plans.logical._
+
+/**
+ * A trait to add statistics propagation to [[LogicalPlan]].
+ */
+trait LogicalPlanStats { self: LogicalPlan =>
+
+  /**
+   * Returns the estimated statistics for the current logical plan node. Under the hood, this
+   * method caches the return value, which is computed based on the configuration passed in the
+   * first time. If the configuration changes, the cache can be invalidated by calling
+   * [[invalidateStatsCache()]].
+   */
+  def stats: Statistics = statsCache.getOrElse {
+    if (conf.cboEnabled) {
+      statsCache = Option(BasicStatsPlanVisitor.visit(self))
+    } else {
+      statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self))
+    }
+    statsCache.get
+  }
+
+  /** A cache for the estimated statistics, such that it will only be computed once. */
+  protected var statsCache: Option[Statistics] = None
+
+  /** Invalidates the stats cache. See [[stats]] for more information. */
+  final def invalidateStatsCache(): Unit = {
+    statsCache = None
+    children.foreach(_.invalidateStatsCache())
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
new file mode 100644
index 0000000..559f120
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import org.apache.spark.sql.catalyst.expressions.AttributeMap
+import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical._
+
+/**
+ * An [[LogicalPlanVisitor]] that computes a single dimension for plan stats: size in bytes.
+ */
+object SizeInBytesOnlyStatsPlanVisitor extends LogicalPlanVisitor[Statistics] {
+
+  /**
+   * A default, commonly used estimation for unary nodes. We assume the input row number is the
+   * same as the output row number, and compute sizes based on the column types.
+   */
+  private def visitUnaryNode(p: UnaryNode): Statistics = {
+    // There should be some overhead in Row object, the size should not be zero when there is
+    // no columns, this help to prevent divide-by-zero error.
+    val childRowSize = p.child.output.map(_.dataType.defaultSize).sum + 8
+    val outputRowSize = p.output.map(_.dataType.defaultSize).sum + 8
+    // Assume there will be the same number of rows as child has.
+    var sizeInBytes = (p.child.stats.sizeInBytes * outputRowSize) / childRowSize
+    if (sizeInBytes == 0) {
+      // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
+      // (product of children).
+      sizeInBytes = 1
+    }
+
+    // Don't propagate rowCount and attributeStats, since they are not estimated here.
+    Statistics(sizeInBytes = sizeInBytes, hints = p.child.stats.hints)
+  }
+
+  /**
+   * For leaf nodes, use its computeStats. For other nodes, we assume the size in bytes is the
+   * sum of all of the children's.
+   */
+  override def default(p: LogicalPlan): Statistics = p match {
+    case p: LeafNode => p.computeStats()
+    case _: LogicalPlan => Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).product)
+  }
+
+  override def visitAggregate(p: Aggregate): Statistics = {
+    if (p.groupingExpressions.isEmpty) {
+      Statistics(
+        sizeInBytes = EstimationUtils.getOutputSize(p.output, outputRowCount = 1),
+        rowCount = Some(1),
+        hints = p.child.stats.hints)
+    } else {
+      visitUnaryNode(p)
+    }
+  }
+
+  override def visitDistinct(p: Distinct): Statistics = default(p)
+
+  override def visitExcept(p: Except): Statistics = p.left.stats.copy()
+
+  override def visitExpand(p: Expand): Statistics = {
+    val sizeInBytes = visitUnaryNode(p).sizeInBytes * p.projections.length
+    Statistics(sizeInBytes = sizeInBytes)
+  }
+
+  override def visitFilter(p: Filter): Statistics = visitUnaryNode(p)
+
+  override def visitGenerate(p: Generate): Statistics = default(p)
+
+  override def visitGlobalLimit(p: GlobalLimit): Statistics = {
+    val limit = p.limitExpr.eval().asInstanceOf[Int]
+    val childStats = p.child.stats
+    val rowCount: BigInt = childStats.rowCount.map(_.min(limit)).getOrElse(limit)
+    // Don't propagate column stats, because we don't know the distribution after limit
+    Statistics(
+      sizeInBytes = EstimationUtils.getOutputSize(p.output, rowCount, childStats.attributeStats),
+      rowCount = Some(rowCount),
+      hints = childStats.hints)
+  }
+
+  override def visitHint(p: ResolvedHint): Statistics = p.child.stats.copy(hints = p.hints)
+
+  override def visitIntersect(p: Intersect): Statistics = {
+    val leftSize = p.left.stats.sizeInBytes
+    val rightSize = p.right.stats.sizeInBytes
+    val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize
+    Statistics(
+      sizeInBytes = sizeInBytes,
+      hints = p.left.stats.hints.resetForJoin())
+  }
+
+  override def visitJoin(p: Join): Statistics = {
+    p.joinType match {
+      case LeftAnti | LeftSemi =>
+        // LeftSemi and LeftAnti won't ever be bigger than left
+        p.left.stats
+      case _ =>
+        // Make sure we don't propagate isBroadcastable in other joins, because
+        // they could explode the size.
+        val stats = default(p)
+        stats.copy(hints = stats.hints.resetForJoin())
+    }
+  }
+
+  override def visitLocalLimit(p: LocalLimit): Statistics = {
+    val limit = p.limitExpr.eval().asInstanceOf[Int]
+    val childStats = p.child.stats
+    if (limit == 0) {
+      // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
+      // (product of children).
+      Statistics(sizeInBytes = 1, rowCount = Some(0), hints = childStats.hints)
+    } else {
+      // The output row count of LocalLimit should be the sum of row counts from each partition.
+      // However, since the number of partitions is not available here, we just use statistics of
+      // the child. Because the distribution after a limit operation is unknown, we do not propagate
+      // the column stats.
+      childStats.copy(attributeStats = AttributeMap(Nil))
+    }
+  }
+
+  override def visitPivot(p: Pivot): Statistics = default(p)
+
+  override def visitProject(p: Project): Statistics = visitUnaryNode(p)
+
+  override def visitRange(p: logical.Range): Statistics = {
+    p.computeStats()
+  }
+
+  override def visitRepartition(p: Repartition): Statistics = default(p)
+
+  override def visitRepartitionByExpr(p: RepartitionByExpression): Statistics = default(p)
+
+  override def visitSample(p: Sample): Statistics = {
+    val ratio = p.upperBound - p.lowerBound
+    var sizeInBytes = EstimationUtils.ceil(BigDecimal(p.child.stats.sizeInBytes) * ratio)
+    if (sizeInBytes == 0) {
+      sizeInBytes = 1
+    }
+    val sampleRows = p.child.stats.rowCount.map(c => EstimationUtils.ceil(BigDecimal(c) * ratio))
+    // Don't propagate column stats, because we don't know the distribution after a sample operation
+    Statistics(sizeInBytes, sampleRows, hints = p.child.stats.hints)
+  }
+
+  override def visitScriptTransform(p: ScriptTransformation): Statistics = default(p)
+
+  override def visitUnion(p: Union): Statistics = {
+    Statistics(sizeInBytes = p.children.map(_.stats.sizeInBytes).sum)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
index 912c5fe..31a8cbd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
@@ -77,37 +77,6 @@ class BasicStatsEstimationSuite extends StatsEstimationTestBase {
     checkStats(globalLimit, stats)
   }
 
-  test("sample estimation") {
-    val sample = Sample(0.0, 0.5, withReplacement = false, (math.random * 1000).toLong, plan)
-    checkStats(sample, Statistics(sizeInBytes = 60, rowCount = Some(5)))
-
-    // Child doesn't have rowCount in stats
-    val childStats = Statistics(sizeInBytes = 120)
-    val childPlan = DummyLogicalPlan(childStats, childStats)
-    val sample2 =
-      Sample(0.0, 0.11, withReplacement = false, (math.random * 1000).toLong, childPlan)
-    checkStats(sample2, Statistics(sizeInBytes = 14))
-  }
-
-  test("estimate statistics when the conf changes") {
-    val expectedDefaultStats =
-      Statistics(
-        sizeInBytes = 40,
-        rowCount = Some(10),
-        attributeStats = AttributeMap(Seq(
-          AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), Some(10), 0, 4, 4))))
-    val expectedCboStats =
-      Statistics(
-        sizeInBytes = 4,
-        rowCount = Some(1),
-        attributeStats = AttributeMap(Seq(
-          AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), Some(5), 0, 4, 4))))
-
-    val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats = expectedCboStats)
-    checkStats(
-      plan, expectedStatsCboOn = expectedCboStats, expectedStatsCboOff = expectedDefaultStats)
-  }
-
   /** Check estimated stats when cbo is turned on/off. */
   private def checkStats(
       plan: LogicalPlan,
@@ -132,16 +101,3 @@ class BasicStatsEstimationSuite extends StatsEstimationTestBase {
   private def checkStats(plan: LogicalPlan, expectedStats: Statistics): Unit =
     checkStats(plan, expectedStats, expectedStats)
 }
-
-/**
- * This class is used for unit-testing the cbo switch, it mimics a logical plan which computes
- * a simple statistics or a cbo estimated statistics based on the conf.
- */
-private case class DummyLogicalPlan(
-    defaultStats: Statistics,
-    cboStats: Statistics) extends LogicalPlan {
-  override def output: Seq[Attribute] = Nil
-  override def children: Seq[LogicalPlan] = Nil
-  override def computeStats: Statistics =
-    if (conf.cboEnabled) cboStats else defaultStats
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
index eaa33e4..31dea2e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
@@ -65,7 +65,7 @@ case class StatsTestPlan(
     attributeStats: AttributeMap[ColumnStat],
     size: Option[BigInt] = None) extends LeafNode {
   override def output: Seq[Attribute] = outputList
-  override def computeStats: Statistics = Statistics(
+  override def computeStats(): Statistics = Statistics(
     // If sizeInBytes is useless in testing, we just use a fake value
     sizeInBytes = size.getOrElse(Int.MaxValue),
     rowCount = Some(rowCount),

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 66f66a2..dcb918e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -88,7 +88,7 @@ case class ExternalRDD[T](
 
   override protected def stringArgs: Iterator[Any] = Iterator(output)
 
-  @transient override def computeStats: Statistics = Statistics(
+  override def computeStats(): Statistics = Statistics(
     // TODO: Instead of returning a default value here, find a way to return a meaningful size
     // estimate for RDDs. See PR 1238 for more discussions.
     sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
@@ -156,7 +156,7 @@ case class LogicalRDD(
 
   override protected def stringArgs: Iterator[Any] = Iterator(output)
 
-  @transient override def computeStats: Statistics = Statistics(
+  override def computeStats(): Statistics = Statistics(
     // TODO: Instead of returning a default value here, find a way to return a meaningful size
     // estimate for RDDs. See PR 1238 for more discussions.
     sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 2972132..39cf8fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -69,7 +69,7 @@ case class InMemoryRelation(
 
   @transient val partitionStatistics = new PartitionStatistics(output)
 
-  override def computeStats: Statistics = {
+  override def computeStats(): Statistics = {
     if (batchStats.value == 0L) {
       // Underlying columnar RDD hasn't been materialized, no useful statistics information
       // available, return the default statistics.

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 6ba190b..699f1ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -48,9 +48,10 @@ case class LogicalRelation(
     output = output.map(QueryPlan.normalizeExprId(_, output)),
     catalogTable = None)
 
-  @transient override def computeStats: Statistics = {
-    catalogTable.flatMap(_.stats.map(_.toPlanStats(output))).getOrElse(
-      Statistics(sizeInBytes = relation.sizeInBytes))
+  override def computeStats(): Statistics = {
+    catalogTable
+      .flatMap(_.stats.map(_.toPlanStats(output)))
+      .getOrElse(Statistics(sizeInBytes = relation.sizeInBytes))
   }
 
   /** Used to lookup original attribute capitalization */

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 4979873..587ae2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -230,6 +230,5 @@ case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends LeafNode
 
   private val sizePerRow = sink.schema.toAttributes.map(_.dataType.defaultSize).sum
 
-  override def computeStats: Statistics =
-    Statistics(sizePerRow * sink.allData.size)
+  override def computeStats(): Statistics = Statistics(sizePerRow * sink.allData.size)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index 3a724aa..9438418 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -86,7 +86,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
         case relation: LogicalRelation => relation
       }
       assert(relations.size === 1, s"Size wrong for:\n ${df.queryExecution}")
-      val size2 = relations(0).computeStats.sizeInBytes
+      val size2 = relations(0).stats.sizeInBytes
       assert(size2 == relations(0).catalogTable.get.stats.get.sizeInBytes)
       assert(size2 < tableStats.get.sizeInBytes)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org