You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/03/24 05:54:46 UTC
[spark] branch master updated:
[SPARK-25196][SPARK-27251][SQL][FOLLOWUP] Add synchronized for
InMemoryRelation.statsOfPlanToCache
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 01e6305 [SPARK-25196][SPARK-27251][SQL][FOLLOWUP] Add synchronized for InMemoryRelation.statsOfPlanToCache
01e6305 is described below
commit 01e63053df3456989fc8bc7fe08cd4a713aa60e5
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Sat Mar 23 22:54:27 2019 -0700
[SPARK-25196][SPARK-27251][SQL][FOLLOWUP] Add synchronized for InMemoryRelation.statsOfPlanToCache
## What changes were proposed in this pull request?
This is a follow-up of #24047; to follow the `CacheManager.cachedData` lock semantics, this pr wrapped the `statsOfPlanToCache` update with `synchronized`.
## How was this patch tested?
Pass Jenkins
Closes #24178 from maropu/SPARK-24047-FOLLOWUP.
Authored-by: Takeshi Yamamuro <ya...@apache.org>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../apache/spark/sql/execution/CacheManager.scala | 9 +---
.../sql/execution/columnar/InMemoryRelation.scala | 54 +++++++++++++++-------
.../columnar/InMemoryColumnarQuerySuite.scala | 2 +-
3 files changed, 39 insertions(+), 26 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 612f693..0145478 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution
-import java.util.concurrent.locks.ReentrantReadWriteLock
-
import scala.collection.immutable.IndexedSeq
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -163,12 +161,7 @@ class CacheManager extends Logging {
val relation = cachedData.cachedRepresentation
val (rowCount, newColStats) =
CommandUtils.computeColumnStats(sparkSession, relation, column)
- val oldStats = relation.statsOfPlanToCache
- val newStats = oldStats.copy(
- rowCount = Some(rowCount),
- attributeStats = AttributeMap((oldStats.attributeStats ++ newColStats).toSeq)
- )
- relation.statsOfPlanToCache = newStats
+ relation.updateStats(rowCount, newColStats)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index fcc3468..1e4453f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.storage.StorageLevel
@@ -145,30 +145,43 @@ object InMemoryRelation {
tableName: Option[String],
logicalPlan: LogicalPlan): InMemoryRelation = {
val cacheBuilder = CachedRDDBuilder(useCompression, batchSize, storageLevel, child, tableName)
- new InMemoryRelation(child.output, cacheBuilder, logicalPlan.outputOrdering)(
- statsOfPlanToCache = logicalPlan.stats)
+ val relation = new InMemoryRelation(child.output, cacheBuilder, logicalPlan.outputOrdering)
+ relation.statsOfPlanToCache = logicalPlan.stats
+ relation
}
def apply(cacheBuilder: CachedRDDBuilder, logicalPlan: LogicalPlan): InMemoryRelation = {
- new InMemoryRelation(cacheBuilder.cachedPlan.output, cacheBuilder, logicalPlan.outputOrdering)(
- statsOfPlanToCache = logicalPlan.stats)
+ val relation = new InMemoryRelation(
+ cacheBuilder.cachedPlan.output, cacheBuilder, logicalPlan.outputOrdering)
+ relation.statsOfPlanToCache = logicalPlan.stats
+ relation
+ }
+
+ def apply(
+ output: Seq[Attribute],
+ cacheBuilder: CachedRDDBuilder,
+ outputOrdering: Seq[SortOrder],
+ statsOfPlanToCache: Statistics): InMemoryRelation = {
+ val relation = InMemoryRelation(output, cacheBuilder, outputOrdering)
+ relation.statsOfPlanToCache = statsOfPlanToCache
+ relation
}
}
case class InMemoryRelation(
output: Seq[Attribute],
@transient cacheBuilder: CachedRDDBuilder,
- override val outputOrdering: Seq[SortOrder])(
- @volatile var statsOfPlanToCache: Statistics)
+ override val outputOrdering: Seq[SortOrder])
extends logical.LeafNode with MultiInstanceRelation {
+ @volatile var statsOfPlanToCache: Statistics = null
+
override protected def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
override def doCanonicalize(): logical.LogicalPlan =
copy(output = output.map(QueryPlan.normalizeExprId(_, cachedPlan.output)),
cacheBuilder,
- outputOrdering)(
- statsOfPlanToCache)
+ outputOrdering)
override def producedAttributes: AttributeSet = outputSet
@@ -176,6 +189,16 @@ case class InMemoryRelation(
def cachedPlan: SparkPlan = cacheBuilder.cachedPlan
+ private[sql] def updateStats(
+ rowCount: Long,
+ newColStats: Map[Attribute, ColumnStat]): Unit = this.synchronized {
+ val newStats = statsOfPlanToCache.copy(
+ rowCount = Some(rowCount),
+ attributeStats = AttributeMap((statsOfPlanToCache.attributeStats ++ newColStats).toSeq)
+ )
+ statsOfPlanToCache = newStats
+ }
+
override def computeStats(): Statistics = {
if (cacheBuilder.sizeInBytesStats.value == 0L) {
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache.
@@ -185,20 +208,17 @@ case class InMemoryRelation(
}
}
- def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
- InMemoryRelation(newOutput, cacheBuilder, outputOrdering)(statsOfPlanToCache)
- }
+ def withOutput(newOutput: Seq[Attribute]): InMemoryRelation =
+ InMemoryRelation(newOutput, cacheBuilder, outputOrdering, statsOfPlanToCache)
override def newInstance(): this.type = {
- new InMemoryRelation(
+ InMemoryRelation(
output.map(_.newInstance()),
cacheBuilder,
- outputOrdering)(
- statsOfPlanToCache).asInstanceOf[this.type]
+ outputOrdering,
+ statsOfPlanToCache).asInstanceOf[this.type]
}
- override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache)
-
override def simpleString(maxFields: Int): String =
s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], ${cacheBuilder.storageLevel}"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 1c0d603..0a1141c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -488,7 +488,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-25727 - otherCopyArgs in InMemoryRelation does not include outputOrdering") {
val data = Seq(100).toDF("count").cache()
val json = data.queryExecution.optimizedPlan.toJSON
- assert(json.contains("outputOrdering") && json.contains("statsOfPlanToCache"))
+ assert(json.contains("outputOrdering"))
}
test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org