You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/09/14 08:51:17 UTC

spark git commit: [SPARK-17480][SQL] Improve performance by removing or caching List.length which is O(n)

Repository: spark
Updated Branches:
  refs/heads/master 18b4f035f -> 4cea9da2a


[SPARK-17480][SQL] Improve performance by removing or caching List.length which is O(n)

## What changes were proposed in this pull request?
Scala's List.length method is O(N) and it makes the gatherCompressibilityStats function O(N^2). Eliminate the List.length calls by writing it in Scala way.

https://github.com/scala/scala/blob/2.10.x/src/library/scala/collection/LinearSeqOptimized.scala#L36

As suggested. Extended the fix to HiveInspectors and AggregationIterator classes as well.

## How was this patch tested?
Profiled a Spark job and found that CompressibleColumnBuilder is using 39% of the CPU. Out of this 39% CompressibleColumnBuilder->gatherCompressibilityStats is using 23% of it. 6.24% of the CPU is spend on List.length which is called inside gatherCompressibilityStats.

After this change we started to save 6.24% of the CPU.

Author: Ergin Seyfe <es...@fb.com>

Closes #15032 from seyfe/gatherCompressibilityStats.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4cea9da2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4cea9da2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4cea9da2

Branch: refs/heads/master
Commit: 4cea9da2ae88b40a5503111f8f37051e2372163e
Parents: 18b4f03
Author: Ergin Seyfe <es...@fb.com>
Authored: Wed Sep 14 09:51:14 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Sep 14 09:51:14 2016 +0100

----------------------------------------------------------------------
 .../spark/sql/execution/aggregate/AggregationIterator.scala   | 7 ++++---
 .../columnar/compression/CompressibleColumnBuilder.scala      | 6 +-----
 .../main/scala/org/apache/spark/sql/hive/HiveInspectors.scala | 6 ++++--
 3 files changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4cea9da2/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
index dfed084..f335912 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
@@ -73,9 +73,10 @@ abstract class AggregationIterator(
       startingInputBufferOffset: Int): Array[AggregateFunction] = {
     var mutableBufferOffset = 0
     var inputBufferOffset: Int = startingInputBufferOffset
-    val functions = new Array[AggregateFunction](expressions.length)
+    val expressionsLength = expressions.length
+    val functions = new Array[AggregateFunction](expressionsLength)
     var i = 0
-    while (i < expressions.length) {
+    while (i < expressionsLength) {
       val func = expressions(i).aggregateFunction
       val funcWithBoundReferences: AggregateFunction = expressions(i).mode match {
         case Partial | Complete if func.isInstanceOf[ImperativeAggregate] =>
@@ -171,7 +172,7 @@ abstract class AggregationIterator(
             case PartialMerge | Final =>
               (buffer: MutableRow, row: InternalRow) => ae.merge(buffer, row)
           }
-      }
+      }.toArray
       // This projection is used to merge buffer values for all expression-based aggregates.
       val aggregationBufferSchema = functions.flatMap(_.aggBufferAttributes)
       val updateProjection =

http://git-wip-us.apache.org/repos/asf/spark/blob/4cea9da2/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala
index 63eae1b..0f4680e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala
@@ -66,11 +66,7 @@ private[columnar] trait CompressibleColumnBuilder[T <: AtomicType]
   }
 
   private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
-    var i = 0
-    while (i < compressionEncoders.length) {
-      compressionEncoders(i).gatherCompressibilityStats(row, ordinal)
-      i += 1
-    }
+    compressionEncoders.foreach(_.gatherCompressibilityStats(row, ordinal))
   }
 
   abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4cea9da2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index bf5cc17..4e74452 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -756,7 +756,8 @@ private[hive] trait HiveInspectors {
       cache: Array[AnyRef],
       dataTypes: Array[DataType]): Array[AnyRef] = {
     var i = 0
-    while (i < inspectors.length) {
+    val length = inspectors.length
+    while (i < length) {
       cache(i) = wrap(row.get(i, dataTypes(i)), inspectors(i), dataTypes(i))
       i += 1
     }
@@ -769,7 +770,8 @@ private[hive] trait HiveInspectors {
       cache: Array[AnyRef],
       dataTypes: Array[DataType]): Array[AnyRef] = {
     var i = 0
-    while (i < inspectors.length) {
+    val length = inspectors.length
+    while (i < length) {
       cache(i) = wrap(row(i), inspectors(i), dataTypes(i))
       i += 1
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org