You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/30 05:46:07 UTC

spark git commit: [SPARK-9458] Avoid object allocation in prefix generation.

Repository: spark
Updated Branches:
  refs/heads/master a200e6456 -> 9514d874f


[SPARK-9458] Avoid object allocation in prefix generation.

In our existing sort prefix generation code, we use expression's eval method to generate the prefix, which results in object allocation for every prefix. We can use the specialized getters available on InternalRow directly to avoid the object allocation.

I also removed the FLOAT prefix, opting for converting float directly to double.

Author: Reynold Xin <rx...@databricks.com>

Closes #7763 from rxin/sort-prefix and squashes the following commits:

5dc2f06 [Reynold Xin] [SPARK-9458] Avoid object allocation in prefix generation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9514d874
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9514d874
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9514d874

Branch: refs/heads/master
Commit: 9514d874f0cf61f1eb4ec4f5f66e053119f769c9
Parents: a200e64
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Jul 29 20:46:03 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jul 29 20:46:03 2015 -0700

----------------------------------------------------------------------
 .../unsafe/sort/PrefixComparators.java          | 16 ------
 .../unsafe/sort/PrefixComparatorsSuite.scala    | 12 -----
 .../sql/execution/UnsafeExternalRowSorter.java  |  2 +-
 .../spark/sql/execution/SortPrefixUtils.scala   | 51 +++++++++-----------
 .../spark/sql/execution/SparkStrategies.scala   |  4 +-
 .../org/apache/spark/sql/execution/sort.scala   |  5 +-
 .../execution/RowFormatConvertersSuite.scala    |  2 +-
 .../sql/execution/UnsafeExternalSortSuite.scala | 10 ++--
 8 files changed, 35 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9514d874/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
index bf1bc5d..5624e06 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
@@ -31,7 +31,6 @@ public class PrefixComparators {
 
   public static final StringPrefixComparator STRING = new StringPrefixComparator();
   public static final IntegralPrefixComparator INTEGRAL = new IntegralPrefixComparator();
-  public static final FloatPrefixComparator FLOAT = new FloatPrefixComparator();
   public static final DoublePrefixComparator DOUBLE = new DoublePrefixComparator();
 
   public static final class StringPrefixComparator extends PrefixComparator {
@@ -78,21 +77,6 @@ public class PrefixComparators {
     public final long NULL_PREFIX = Long.MIN_VALUE;
   }
 
-  public static final class FloatPrefixComparator extends PrefixComparator {
-    @Override
-    public int compare(long aPrefix, long bPrefix) {
-      float a = Float.intBitsToFloat((int) aPrefix);
-      float b = Float.intBitsToFloat((int) bPrefix);
-      return Utils.nanSafeCompareFloats(a, b);
-    }
-
-    public long computePrefix(float value) {
-      return Float.floatToIntBits(value) & 0xffffffffL;
-    }
-
-    public final long NULL_PREFIX = computePrefix(Float.NEGATIVE_INFINITY);
-  }
-
   public static final class DoublePrefixComparator extends PrefixComparator {
     @Override
     public int compare(long aPrefix, long bPrefix) {

http://git-wip-us.apache.org/repos/asf/spark/blob/9514d874/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
index dc03e37..28fe925 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
@@ -48,18 +48,6 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks {
     forAll { (s1: String, s2: String) => testPrefixComparison(s1, s2) }
   }
 
-  test("float prefix comparator handles NaN properly") {
-    val nan1: Float = java.lang.Float.intBitsToFloat(0x7f800001)
-    val nan2: Float = java.lang.Float.intBitsToFloat(0x7fffffff)
-    assert(nan1.isNaN)
-    assert(nan2.isNaN)
-    val nan1Prefix = PrefixComparators.FLOAT.computePrefix(nan1)
-    val nan2Prefix = PrefixComparators.FLOAT.computePrefix(nan2)
-    assert(nan1Prefix === nan2Prefix)
-    val floatMaxPrefix = PrefixComparators.FLOAT.computePrefix(Float.MaxValue)
-    assert(PrefixComparators.FLOAT.compare(nan1Prefix, floatMaxPrefix) === 1)
-  }
-
   test("double prefix comparator handles NaNs properly") {
     val nan1: Double = java.lang.Double.longBitsToDouble(0x7ff0000000000001L)
     val nan2: Double = java.lang.Double.longBitsToDouble(0x7fffffffffffffffL)

http://git-wip-us.apache.org/repos/asf/spark/blob/9514d874/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 4c3f2c6..8342833 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -121,7 +121,7 @@ final class UnsafeExternalRowSorter {
         // here in order to prevent memory leaks.
         cleanupResources();
       }
-      return new AbstractScalaRowIterator() {
+      return new AbstractScalaRowIterator<InternalRow>() {
 
         private final int numFields = schema.length();
         private UnsafeRow row = new UnsafeRow();

http://git-wip-us.apache.org/repos/asf/spark/blob/9514d874/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
index 2dee354..050d27f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
@@ -19,7 +19,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.SortOrder
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, SortOrder}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, PrefixComparator}
@@ -39,57 +39,54 @@ object SortPrefixUtils {
     sortOrder.dataType match {
       case StringType => PrefixComparators.STRING
       case BooleanType | ByteType | ShortType | IntegerType | LongType => PrefixComparators.INTEGRAL
-      case FloatType => PrefixComparators.FLOAT
-      case DoubleType => PrefixComparators.DOUBLE
+      case FloatType | DoubleType => PrefixComparators.DOUBLE
       case _ => NoOpPrefixComparator
     }
   }
 
   def getPrefixComputer(sortOrder: SortOrder): InternalRow => Long = {
+    val bound = sortOrder.child.asInstanceOf[BoundReference]
+    val pos = bound.ordinal
     sortOrder.dataType match {
-      case StringType => (row: InternalRow) => {
-        PrefixComparators.STRING.computePrefix(sortOrder.child.eval(row).asInstanceOf[UTF8String])
-      }
+      case StringType =>
+        (row: InternalRow) => {
+          PrefixComparators.STRING.computePrefix(row.getUTF8String(pos))
+        }
       case BooleanType =>
         (row: InternalRow) => {
-          val exprVal = sortOrder.child.eval(row)
-          if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
-          else if (sortOrder.child.eval(row).asInstanceOf[Boolean]) 1
+          if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX
+          else if (row.getBoolean(pos)) 1
           else 0
         }
       case ByteType =>
         (row: InternalRow) => {
-          val exprVal = sortOrder.child.eval(row)
-          if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
-          else sortOrder.child.eval(row).asInstanceOf[Byte]
+          if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getByte(pos)
         }
       case ShortType =>
         (row: InternalRow) => {
-          val exprVal = sortOrder.child.eval(row)
-          if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
-          else sortOrder.child.eval(row).asInstanceOf[Short]
+          if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getShort(pos)
         }
       case IntegerType =>
         (row: InternalRow) => {
-          val exprVal = sortOrder.child.eval(row)
-          if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
-          else sortOrder.child.eval(row).asInstanceOf[Int]
+          if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getInt(pos)
         }
       case LongType =>
         (row: InternalRow) => {
-          val exprVal = sortOrder.child.eval(row)
-          if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
-          else sortOrder.child.eval(row).asInstanceOf[Long]
+          if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getLong(pos)
         }
       case FloatType => (row: InternalRow) => {
-        val exprVal = sortOrder.child.eval(row)
-        if (exprVal == null) PrefixComparators.FLOAT.NULL_PREFIX
-        else PrefixComparators.FLOAT.computePrefix(sortOrder.child.eval(row).asInstanceOf[Float])
+        if (row.isNullAt(pos)) {
+          PrefixComparators.DOUBLE.NULL_PREFIX
+        } else {
+          PrefixComparators.DOUBLE.computePrefix(row.getFloat(pos).toDouble)
+        }
       }
       case DoubleType => (row: InternalRow) => {
-        val exprVal = sortOrder.child.eval(row)
-        if (exprVal == null) PrefixComparators.DOUBLE.NULL_PREFIX
-        else PrefixComparators.DOUBLE.computePrefix(sortOrder.child.eval(row).asInstanceOf[Double])
+        if (row.isNullAt(pos)) {
+          PrefixComparators.DOUBLE.NULL_PREFIX
+        } else {
+          PrefixComparators.DOUBLE.computePrefix(row.getDouble(pos))
+        }
       }
       case _ => (row: InternalRow) => 0L
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9514d874/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f3ef066..4ab2c41 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -340,8 +340,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
      */
     def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
       if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled &&
-        UnsafeExternalSort.supportsSchema(child.schema)) {
-        execution.UnsafeExternalSort(sortExprs, global, child)
+        TungstenSort.supportsSchema(child.schema)) {
+        execution.TungstenSort(sortExprs, global, child)
       } else if (sqlContext.conf.externalSortEnabled) {
         execution.ExternalSort(sortExprs, global, child)
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/9514d874/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index f822088..d0ad310 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -97,7 +97,7 @@ case class ExternalSort(
  * @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
  *                           spill every `frequency` records.
  */
-case class UnsafeExternalSort(
+case class TungstenSort(
     sortOrder: Seq[SortOrder],
     global: Boolean,
     child: SparkPlan,
@@ -110,7 +110,6 @@ case class UnsafeExternalSort(
     if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
 
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
-    assert(codegenEnabled, "UnsafeExternalSort requires code generation to be enabled")
     def doSort(iterator: Iterator[InternalRow]): Iterator[InternalRow] = {
       val ordering = newOrdering(sortOrder, child.output)
       val boundSortExpression = BindReferences.bindReference(sortOrder.head, child.output)
@@ -149,7 +148,7 @@ case class UnsafeExternalSort(
 }
 
 @DeveloperApi
-object UnsafeExternalSort {
+object TungstenSort {
   /**
    * Return true if UnsafeExternalSort can sort rows with the given schema, false otherwise.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/9514d874/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index 7b75f75..c458f95 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -31,7 +31,7 @@ class RowFormatConvertersSuite extends SparkPlanTest {
 
   private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
   assert(!outputsSafe.outputsUnsafeRows)
-  private val outputsUnsafe = UnsafeExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
+  private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null))
   assert(outputsUnsafe.outputsUnsafeRows)
 
   test("planner should insert unsafe->safe conversions when required") {

http://git-wip-us.apache.org/repos/asf/spark/blob/9514d874/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
index 7a4baa9..9cabc4b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
@@ -42,7 +42,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
     TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
     checkThatPlansAgree(
       (1 to 100).map(v => Tuple1(v)).toDF("a"),
-      (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
+      (child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)),
       (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
       sortAnswers = false
     )
@@ -53,7 +53,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
     try {
       checkThatPlansAgree(
         (1 to 100).map(v => Tuple1(v)).toDF("a"),
-        (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
+        (child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)),
         (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
         sortAnswers = false
       )
@@ -68,7 +68,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
     val stringLength = 1024 * 1024 * 2
     checkThatPlansAgree(
       Seq(Tuple1("a" * stringLength), Tuple1("b" * stringLength)).toDF("a").repartition(1),
-      UnsafeExternalSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
+      TungstenSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
       Sort(sortOrder, global = true, _: SparkPlan),
       sortAnswers = false
     )
@@ -88,11 +88,11 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
         TestSQLContext.sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
         StructType(StructField("a", dataType, nullable = true) :: Nil)
       )
-      assert(UnsafeExternalSort.supportsSchema(inputDf.schema))
+      assert(TungstenSort.supportsSchema(inputDf.schema))
       checkThatPlansAgree(
         inputDf,
         plan => ConvertToSafe(
-          UnsafeExternalSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)),
+          TungstenSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)),
         Sort(sortOrder, global = true, _: SparkPlan),
         sortAnswers = false
       )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org