You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/03/24 05:54:46 UTC

[spark] branch master updated: [SPARK-25196][SPARK-27251][SQL][FOLLOWUP] Add synchronized for InMemoryRelation.statsOfPlanToCache

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 01e6305  [SPARK-25196][SPARK-27251][SQL][FOLLOWUP] Add synchronized for InMemoryRelation.statsOfPlanToCache
01e6305 is described below

commit 01e63053df3456989fc8bc7fe08cd4a713aa60e5
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Sat Mar 23 22:54:27 2019 -0700

    [SPARK-25196][SPARK-27251][SQL][FOLLOWUP] Add synchronized for InMemoryRelation.statsOfPlanToCache
    
    ## What changes were proposed in this pull request?
    This is a follow-up of #24047; to follow the `CacheManager.cachedData` lock semantics, this pr wrapped the `statsOfPlanToCache` update with `synchronized`.
    
    ## How was this patch tested?
    Pass Jenkins
    
    Closes #24178 from maropu/SPARK-24047-FOLLOWUP.
    
    Authored-by: Takeshi Yamamuro <ya...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/sql/execution/CacheManager.scala  |  9 +---
 .../sql/execution/columnar/InMemoryRelation.scala  | 54 +++++++++++++++-------
 .../columnar/InMemoryColumnarQuerySuite.scala      |  2 +-
 3 files changed, 39 insertions(+), 26 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 612f693..0145478 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution
 
-import java.util.concurrent.locks.ReentrantReadWriteLock
-
 import scala.collection.immutable.IndexedSeq
 
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -163,12 +161,7 @@ class CacheManager extends Logging {
     val relation = cachedData.cachedRepresentation
     val (rowCount, newColStats) =
       CommandUtils.computeColumnStats(sparkSession, relation, column)
-    val oldStats = relation.statsOfPlanToCache
-    val newStats = oldStats.copy(
-      rowCount = Some(rowCount),
-      attributeStats = AttributeMap((oldStats.attributeStats ++ newColStats).toSeq)
-    )
-    relation.statsOfPlanToCache = newStats
+    relation.updateStats(rowCount, newColStats)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index fcc3468..1e4453f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.storage.StorageLevel
@@ -145,30 +145,43 @@ object InMemoryRelation {
       tableName: Option[String],
       logicalPlan: LogicalPlan): InMemoryRelation = {
     val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, storageLevel, child, tableName)
-    new InMemoryRelation(child.output, cacheBuilder, logicalPlan.outputOrdering)(
-      statsOfPlanToCache = logicalPlan.stats)
+    val relation = new InMemoryRelation(child.output, cacheBuilder, logicalPlan.outputOrdering)
+    relation.statsOfPlanToCache = logicalPlan.stats
+    relation
   }
 
   def apply(cacheBuilder: CachedRDDBuilder, logicalPlan: LogicalPlan): InMemoryRelation = {
-    new InMemoryRelation(cacheBuilder.cachedPlan.output, cacheBuilder, logicalPlan.outputOrdering)(
-      statsOfPlanToCache = logicalPlan.stats)
+    val relation = new InMemoryRelation(
+      cacheBuilder.cachedPlan.output, cacheBuilder, logicalPlan.outputOrdering)
+    relation.statsOfPlanToCache = logicalPlan.stats
+    relation
+  }
+
+  def apply(
+      output: Seq[Attribute],
+      cacheBuilder: CachedRDDBuilder,
+      outputOrdering: Seq[SortOrder],
+      statsOfPlanToCache: Statistics): InMemoryRelation = {
+    val relation = InMemoryRelation(output, cacheBuilder, outputOrdering)
+    relation.statsOfPlanToCache = statsOfPlanToCache
+    relation
   }
 }
 
 case class InMemoryRelation(
     output: Seq[Attribute],
     @transient cacheBuilder: CachedRDDBuilder,
-    override val outputOrdering: Seq[SortOrder])(
-    @volatile var statsOfPlanToCache: Statistics)
+    override val outputOrdering: Seq[SortOrder])
   extends logical.LeafNode with MultiInstanceRelation {
 
+  @volatile var statsOfPlanToCache: Statistics = null
+
   override protected def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
 
   override def doCanonicalize(): logical.LogicalPlan =
     copy(output = output.map(QueryPlan.normalizeExprId(_, cachedPlan.output)),
       cacheBuilder,
-      outputOrdering)(
-      statsOfPlanToCache)
+      outputOrdering)
 
   override def producedAttributes: AttributeSet = outputSet
 
@@ -176,6 +189,16 @@ case class InMemoryRelation(
 
   def cachedPlan: SparkPlan = cacheBuilder.cachedPlan
 
+  private[sql] def updateStats(
+      rowCount: Long,
+      newColStats: Map[Attribute, ColumnStat]): Unit = this.synchronized {
+    val newStats = statsOfPlanToCache.copy(
+      rowCount = Some(rowCount),
+      attributeStats = AttributeMap((statsOfPlanToCache.attributeStats ++ newColStats).toSeq)
+    )
+    statsOfPlanToCache = newStats
+  }
+
   override def computeStats(): Statistics = {
     if (cacheBuilder.sizeInBytesStats.value == 0L) {
       // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache.
@@ -185,20 +208,17 @@ case class InMemoryRelation(
     }
   }
 
-  def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
-    InMemoryRelation(newOutput, cacheBuilder, outputOrdering)(statsOfPlanToCache)
-  }
+  def withOutput(newOutput: Seq[Attribute]): InMemoryRelation =
+    InMemoryRelation(newOutput, cacheBuilder, outputOrdering, statsOfPlanToCache)
 
   override def newInstance(): this.type = {
-    new InMemoryRelation(
+    InMemoryRelation(
       output.map(_.newInstance()),
       cacheBuilder,
-      outputOrdering)(
-        statsOfPlanToCache).asInstanceOf[this.type]
+      outputOrdering,
+      statsOfPlanToCache).asInstanceOf[this.type]
   }
 
-  override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache)
-
   override def simpleString(maxFields: Int): String =
     s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], ${cacheBuilder.storageLevel}"
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 1c0d603..0a1141c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -488,7 +488,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
   test("SPARK-25727 - otherCopyArgs in InMemoryRelation does not include outputOrdering") {
     val data = Seq(100).toDF("count").cache()
     val json = data.queryExecution.optimizedPlan.toJSON
-    assert(json.contains("outputOrdering") && json.contains("statsOfPlanToCache"))
+    assert(json.contains("outputOrdering"))
   }
 
   test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org