You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/09/16 21:03:00 UTC

spark git commit: [SPARK-17549][SQL] Only collect table size stat in driver for cached relation.

Repository: spark
Updated Branches:
  refs/heads/master b9323fc93 -> 39e2bad6a


[SPARK-17549][SQL] Only collect table size stat in driver for cached relation.

The existing code caches all stats for all columns for each partition
in the driver; for a large relation, this causes extreme memory usage,
which leads to gc hell and application failures.

It seems that only the size in bytes of the data is actually used in the
driver, so instead just colllect that. In executors, the full stats are
still kept, but that's not a big problem; we expect the data to be distributed
and thus not really incur in too much memory pressure in each individual
executor.

There are also potential improvements on the executor side, since the data
being stored currently is very wasteful (e.g. storing boxed types vs.
primitive types for stats). But that's a separate issue.

On a mildly related change, I'm also adding code to catch exceptions in the
code generator since Janino was breaking with the test data I tried this
patch on.

Tested with unit tests and by doing a count a very wide table (20k columns)
with many partitions.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #15112 from vanzin/SPARK-17549.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39e2bad6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39e2bad6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39e2bad6

Branch: refs/heads/master
Commit: 39e2bad6a866d27c3ca594d15e574a1da3ee84cc
Parents: b9323fc
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Sep 16 14:02:56 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Sep 16 14:02:56 2016 -0700

----------------------------------------------------------------------
 .../expressions/codegen/CodeGenerator.scala     | 18 ++++++++++-----
 .../execution/columnar/InMemoryRelation.scala   | 24 +++++---------------
 .../columnar/InMemoryColumnarQuerySuite.scala   | 14 ++++++++++++
 3 files changed, 32 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/39e2bad6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index f982c22..33b9b80 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -23,6 +23,7 @@ import java.util.{Map => JavaMap}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import scala.util.control.NonFatal
 
 import com.google.common.cache.{CacheBuilder, CacheLoader}
 import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, SimpleCompiler}
@@ -910,14 +911,19 @@ object CodeGenerator extends Logging {
     codeAttrField.setAccessible(true)
     classes.foreach { case (_, classBytes) =>
       CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length)
-      val cf = new ClassFile(new ByteArrayInputStream(classBytes))
-      cf.methodInfos.asScala.foreach { method =>
-        method.getAttributes().foreach { a =>
-          if (a.getClass.getName == codeAttr.getName) {
-            CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(
-              codeAttrField.get(a).asInstanceOf[Array[Byte]].length)
+      try {
+        val cf = new ClassFile(new ByteArrayInputStream(classBytes))
+        cf.methodInfos.asScala.foreach { method =>
+          method.getAttributes().foreach { a =>
+            if (a.getClass.getName == codeAttr.getName) {
+              CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(
+                codeAttrField.get(a).asInstanceOf[Array[Byte]].length)
+            }
           }
         }
+      } catch {
+        case NonFatal(e) =>
+          logWarning("Error calculating stats of compiled class.", e)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/39e2bad6/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 479934a..56bd5c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.columnar
 
-import scala.collection.JavaConverters._
-
 import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.network.util.JavaUtils
@@ -31,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.CollectionAccumulator
+import org.apache.spark.util.LongAccumulator
 
 
 object InMemoryRelation {
@@ -63,8 +61,7 @@ case class InMemoryRelation(
     @transient child: SparkPlan,
     tableName: Option[String])(
     @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
-    val batchStats: CollectionAccumulator[InternalRow] =
-      child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
+    val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
   extends logical.LeafNode with MultiInstanceRelation {
 
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
@@ -74,21 +71,12 @@ case class InMemoryRelation(
   @transient val partitionStatistics = new PartitionStatistics(output)
 
   override lazy val statistics: Statistics = {
-    if (batchStats.value.isEmpty) {
+    if (batchStats.value == 0L) {
       // Underlying columnar RDD hasn't been materialized, no useful statistics information
       // available, return the default statistics.
       Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
     } else {
-      // Underlying columnar RDD has been materialized, required information has also been
-      // collected via the `batchStats` accumulator.
-      val sizeOfRow: Expression =
-        BindReferences.bindReference(
-          output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
-          partitionStatistics.schema)
-
-      val sizeInBytes =
-        batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
-      Statistics(sizeInBytes = sizeInBytes)
+      Statistics(sizeInBytes = batchStats.value.longValue)
     }
   }
 
@@ -139,10 +127,10 @@ case class InMemoryRelation(
             rowCount += 1
           }
 
+          batchStats.add(totalSize)
+
           val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
             .flatMap(_.values))
-
-          batchStats.add(stats)
           CachedBatch(rowCount, columnBuilders.map { builder =>
             JavaUtils.bufferToArray(builder.build())
           }, stats)

http://git-wip-us.apache.org/repos/asf/spark/blob/39e2bad6/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 9378396..0daa29b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -232,4 +232,18 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
     val columnTypes2 = List.fill(length2)(IntegerType)
     val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
   }
+
+  test("SPARK-17549: cached table size should be correctly calculated") {
+    val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
+    val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
+    val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)
+
+    // Materialize the data.
+    val expectedAnswer = data.collect()
+    checkAnswer(cached, expectedAnswer)
+
+    // Check that the right size was calculated.
+    assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org