You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/09/18 17:22:44 UTC
spark git commit: [SPARK-10451] [SQL] Prevent unnecessary
serializations in InMemoryColumnarTableScan
Repository: spark
Updated Branches:
refs/heads/master e3b5d6cb2 -> 20fd35dfd
[SPARK-10451] [SQL] Prevent unnecessary serializations in InMemoryColumnarTableScan
Many of the fields in InMemoryColumnar scan and InMemoryRelation can be made transient.
This reduces my 1000ms job to abt 700 ms . The task size reduces from 2.8 mb to ~1300kb
Author: Yash Datta <Ya...@guavus.com>
Closes #8604 from saucam/serde.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20fd35df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20fd35df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20fd35df
Branch: refs/heads/master
Commit: 20fd35dfd1ac402b622604e7bbedcc53a580b0a2
Parents: e3b5d6c
Author: Yash Datta <Ya...@guavus.com>
Authored: Fri Sep 18 08:22:38 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Sep 18 08:22:38 2015 -0700
----------------------------------------------------------------------
.../columnar/InMemoryColumnarTableScan.scala | 35 ++++++++++++--------
1 file changed, 21 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/20fd35df/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 66d429b..d7e145f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -48,10 +48,10 @@ private[sql] case class InMemoryRelation(
useCompression: Boolean,
batchSize: Int,
storageLevel: StorageLevel,
- child: SparkPlan,
+ @transient child: SparkPlan,
tableName: Option[String])(
- private var _cachedColumnBuffers: RDD[CachedBatch] = null,
- private var _statistics: Statistics = null,
+ @transient private var _cachedColumnBuffers: RDD[CachedBatch] = null,
+ @transient private var _statistics: Statistics = null,
private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null)
extends LogicalPlan with MultiInstanceRelation {
@@ -62,7 +62,7 @@ private[sql] case class InMemoryRelation(
_batchStats
}
- val partitionStatistics = new PartitionStatistics(output)
+ @transient val partitionStatistics = new PartitionStatistics(output)
private def computeSizeInBytes = {
val sizeOfRow: Expression =
@@ -196,7 +196,7 @@ private[sql] case class InMemoryRelation(
private[sql] case class InMemoryColumnarTableScan(
attributes: Seq[Attribute],
predicates: Seq[Expression],
- relation: InMemoryRelation)
+ @transient relation: InMemoryRelation)
extends LeafNode {
override def output: Seq[Attribute] = attributes
@@ -205,7 +205,7 @@ private[sql] case class InMemoryColumnarTableScan(
// Returned filter predicate should return false iff it is impossible for the input expression
// to evaluate to `true' based on statistics collected about this partition batch.
- val buildFilter: PartialFunction[Expression, Expression] = {
+ @transient val buildFilter: PartialFunction[Expression, Expression] = {
case And(lhs: Expression, rhs: Expression)
if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
(buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _)
@@ -268,16 +268,23 @@ private[sql] case class InMemoryColumnarTableScan(
readBatches.setValue(0)
}
- relation.cachedColumnBuffers.mapPartitions { cachedBatchIterator =>
- val partitionFilter = newPredicate(
- partitionFilters.reduceOption(And).getOrElse(Literal(true)),
- relation.partitionStatistics.schema)
+ // Using these variables here to avoid serialization of entire objects (if referenced directly)
+ // within the map Partitions closure.
+ val schema = relation.partitionStatistics.schema
+ val schemaIndex = schema.zipWithIndex
+ val relOutput = relation.output
+ val buffers = relation.cachedColumnBuffers
+
+ buffers.mapPartitions { cachedBatchIterator =>
+ val partitionFilter = newPredicate(
+ partitionFilters.reduceOption(And).getOrElse(Literal(true)),
+ schema)
// Find the ordinals and data types of the requested columns. If none are requested, use the
// narrowest (the field with minimum default element size).
val (requestedColumnIndices, requestedColumnDataTypes) = if (attributes.isEmpty) {
val (narrowestOrdinal, narrowestDataType) =
- relation.output.zipWithIndex.map { case (a, ordinal) =>
+ relOutput.zipWithIndex.map { case (a, ordinal) =>
ordinal -> a.dataType
} minBy { case (_, dataType) =>
ColumnType(dataType).defaultSize
@@ -285,7 +292,7 @@ private[sql] case class InMemoryColumnarTableScan(
Seq(narrowestOrdinal) -> Seq(narrowestDataType)
} else {
attributes.map { a =>
- relation.output.indexWhere(_.exprId == a.exprId) -> a.dataType
+ relOutput.indexWhere(_.exprId == a.exprId) -> a.dataType
}.unzip
}
@@ -296,7 +303,7 @@ private[sql] case class InMemoryColumnarTableScan(
// Build column accessors
val columnAccessors = requestedColumnIndices.map { batchColumnIndex =>
ColumnAccessor(
- relation.output(batchColumnIndex).dataType,
+ relOutput(batchColumnIndex).dataType,
ByteBuffer.wrap(cachedBatch.buffers(batchColumnIndex)))
}
@@ -328,7 +335,7 @@ private[sql] case class InMemoryColumnarTableScan(
if (inMemoryPartitionPruningEnabled) {
cachedBatchIterator.filter { cachedBatch =>
if (!partitionFilter(cachedBatch.stats)) {
- def statsString: String = relation.partitionStatistics.schema.zipWithIndex.map {
+ def statsString: String = schemaIndex.map {
case (a, i) =>
val value = cachedBatch.stats.get(i, a.dataType)
s"${a.name}: $value"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org