You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/09/20 18:54:01 UTC
spark git commit: [SPARK-17549][SQL] Revert "[] Only collect table
size stat in driver for cached relation."
Repository: spark
Updated Branches:
refs/heads/master a6aade004 -> 9ac68dbc5
[SPARK-17549][SQL] Revert "[] Only collect table size stat in driver for cached relation."
This reverts commit 39e2bad6a866d27c3ca594d15e574a1da3ee84cc because of the problem mentioned at https://issues.apache.org/jira/browse/SPARK-17549?focusedCommentId=15505060&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15505060
Author: Yin Huai <yh...@databricks.com>
Closes #15157 from yhuai/revert-SPARK-17549.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ac68dbc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ac68dbc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ac68dbc
Branch: refs/heads/master
Commit: 9ac68dbc5720026ea92acc61d295ca64d0d3d132
Parents: a6aade0
Author: Yin Huai <yh...@databricks.com>
Authored: Tue Sep 20 11:53:57 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Sep 20 11:53:57 2016 -0700
----------------------------------------------------------------------
.../execution/columnar/InMemoryRelation.scala | 24 +++++++++++++++-----
.../columnar/InMemoryColumnarQuerySuite.scala | 14 ------------
2 files changed, 18 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9ac68dbc/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 56bd5c1..479934a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.columnar
+import scala.collection.JavaConverters._
+
import org.apache.commons.lang3.StringUtils
import org.apache.spark.network.util.JavaUtils
@@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.CollectionAccumulator
object InMemoryRelation {
@@ -61,7 +63,8 @@ case class InMemoryRelation(
@transient child: SparkPlan,
tableName: Option[String])(
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
- val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
+ val batchStats: CollectionAccumulator[InternalRow] =
+ child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
extends logical.LeafNode with MultiInstanceRelation {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
@@ -71,12 +74,21 @@ case class InMemoryRelation(
@transient val partitionStatistics = new PartitionStatistics(output)
override lazy val statistics: Statistics = {
- if (batchStats.value == 0L) {
+ if (batchStats.value.isEmpty) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
} else {
- Statistics(sizeInBytes = batchStats.value.longValue)
+ // Underlying columnar RDD has been materialized, required information has also been
+ // collected via the `batchStats` accumulator.
+ val sizeOfRow: Expression =
+ BindReferences.bindReference(
+ output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
+ partitionStatistics.schema)
+
+ val sizeInBytes =
+ batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
+ Statistics(sizeInBytes = sizeInBytes)
}
}
@@ -127,10 +139,10 @@ case class InMemoryRelation(
rowCount += 1
}
- batchStats.add(totalSize)
-
val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
.flatMap(_.values))
+
+ batchStats.add(stats)
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
http://git-wip-us.apache.org/repos/asf/spark/blob/9ac68dbc/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 0daa29b..9378396 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -232,18 +232,4 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
val columnTypes2 = List.fill(length2)(IntegerType)
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
}
-
- test("SPARK-17549: cached table size should be correctly calculated") {
- val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
- val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
- val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)
-
- // Materialize the data.
- val expectedAnswer = data.collect()
- checkAnswer(cached, expectedAnswer)
-
- // Check that the right size was calculated.
- assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize)
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org