You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/09/18 17:22:44 UTC

spark git commit: [SPARK-10451] [SQL] Prevent unnecessary serializations in InMemoryColumnarTableScan

Repository: spark
Updated Branches:
  refs/heads/master e3b5d6cb2 -> 20fd35dfd


[SPARK-10451] [SQL] Prevent unnecessary serializations in InMemoryColumnarTableScan

Many of the fields in InMemoryColumnar scan and InMemoryRelation can be made transient.

This  reduces my 1000ms job to abt 700 ms . The task size reduces from 2.8 mb to ~1300kb

Author: Yash Datta <Ya...@guavus.com>

Closes #8604 from saucam/serde.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20fd35df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20fd35df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20fd35df

Branch: refs/heads/master
Commit: 20fd35dfd1ac402b622604e7bbedcc53a580b0a2
Parents: e3b5d6c
Author: Yash Datta <Ya...@guavus.com>
Authored: Fri Sep 18 08:22:38 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Sep 18 08:22:38 2015 -0700

----------------------------------------------------------------------
 .../columnar/InMemoryColumnarTableScan.scala    | 35 ++++++++++++--------
 1 file changed, 21 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/20fd35df/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 66d429b..d7e145f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -48,10 +48,10 @@ private[sql] case class InMemoryRelation(
     useCompression: Boolean,
     batchSize: Int,
     storageLevel: StorageLevel,
-    child: SparkPlan,
+    @transient child: SparkPlan,
     tableName: Option[String])(
-    private var _cachedColumnBuffers: RDD[CachedBatch] = null,
-    private var _statistics: Statistics = null,
+    @transient private var _cachedColumnBuffers: RDD[CachedBatch] = null,
+    @transient private var _statistics: Statistics = null,
     private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null)
   extends LogicalPlan with MultiInstanceRelation {
 
@@ -62,7 +62,7 @@ private[sql] case class InMemoryRelation(
       _batchStats
     }
 
-  val partitionStatistics = new PartitionStatistics(output)
+  @transient val partitionStatistics = new PartitionStatistics(output)
 
   private def computeSizeInBytes = {
     val sizeOfRow: Expression =
@@ -196,7 +196,7 @@ private[sql] case class InMemoryRelation(
 private[sql] case class InMemoryColumnarTableScan(
     attributes: Seq[Attribute],
     predicates: Seq[Expression],
-    relation: InMemoryRelation)
+    @transient relation: InMemoryRelation)
   extends LeafNode {
 
   override def output: Seq[Attribute] = attributes
@@ -205,7 +205,7 @@ private[sql] case class InMemoryColumnarTableScan(
 
   // Returned filter predicate should return false iff it is impossible for the input expression
   // to evaluate to `true' based on statistics collected about this partition batch.
-  val buildFilter: PartialFunction[Expression, Expression] = {
+  @transient val buildFilter: PartialFunction[Expression, Expression] = {
     case And(lhs: Expression, rhs: Expression)
       if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
       (buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _)
@@ -268,16 +268,23 @@ private[sql] case class InMemoryColumnarTableScan(
       readBatches.setValue(0)
     }
 
-    relation.cachedColumnBuffers.mapPartitions { cachedBatchIterator =>
-      val partitionFilter = newPredicate(
-        partitionFilters.reduceOption(And).getOrElse(Literal(true)),
-        relation.partitionStatistics.schema)
+    // Using these variables here to avoid serialization of entire objects (if referenced directly)
+    // within the map Partitions closure.
+    val schema = relation.partitionStatistics.schema
+    val schemaIndex = schema.zipWithIndex
+    val relOutput = relation.output
+    val buffers = relation.cachedColumnBuffers
+
+    buffers.mapPartitions { cachedBatchIterator =>
+    val partitionFilter = newPredicate(
+      partitionFilters.reduceOption(And).getOrElse(Literal(true)),
+      schema)
 
       // Find the ordinals and data types of the requested columns.  If none are requested, use the
       // narrowest (the field with minimum default element size).
       val (requestedColumnIndices, requestedColumnDataTypes) = if (attributes.isEmpty) {
         val (narrowestOrdinal, narrowestDataType) =
-          relation.output.zipWithIndex.map { case (a, ordinal) =>
+          relOutput.zipWithIndex.map { case (a, ordinal) =>
             ordinal -> a.dataType
           } minBy { case (_, dataType) =>
             ColumnType(dataType).defaultSize
@@ -285,7 +292,7 @@ private[sql] case class InMemoryColumnarTableScan(
         Seq(narrowestOrdinal) -> Seq(narrowestDataType)
       } else {
         attributes.map { a =>
-          relation.output.indexWhere(_.exprId == a.exprId) -> a.dataType
+          relOutput.indexWhere(_.exprId == a.exprId) -> a.dataType
         }.unzip
       }
 
@@ -296,7 +303,7 @@ private[sql] case class InMemoryColumnarTableScan(
           // Build column accessors
           val columnAccessors = requestedColumnIndices.map { batchColumnIndex =>
             ColumnAccessor(
-              relation.output(batchColumnIndex).dataType,
+              relOutput(batchColumnIndex).dataType,
               ByteBuffer.wrap(cachedBatch.buffers(batchColumnIndex)))
           }
 
@@ -328,7 +335,7 @@ private[sql] case class InMemoryColumnarTableScan(
         if (inMemoryPartitionPruningEnabled) {
           cachedBatchIterator.filter { cachedBatch =>
             if (!partitionFilter(cachedBatch.stats)) {
-              def statsString: String = relation.partitionStatistics.schema.zipWithIndex.map {
+              def statsString: String = schemaIndex.map {
                 case (a, i) =>
                   val value = cachedBatch.stats.get(i, a.dataType)
                   s"${a.name}: $value"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org