You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/30 10:04:31 UTC

spark git commit: Revert "[SPARK-9458] Avoid object allocation in prefix generation."

Repository: spark
Updated Branches:
  refs/heads/master 76f2e393a -> 4a8bb9d00


Revert "[SPARK-9458] Avoid object allocation in prefix generation."

This reverts commit 9514d874f0cf61f1eb4ec4f5f66e053119f769c9.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a8bb9d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a8bb9d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a8bb9d0

Branch: refs/heads/master
Commit: 4a8bb9d00d8181aff5f5183194d9aa2a65deacdf
Parents: 76f2e39
Author: Reynold Xin <rx...@databricks.com>
Authored: Thu Jul 30 01:04:24 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Jul 30 01:04:24 2015 -0700

----------------------------------------------------------------------
 .../unsafe/sort/PrefixComparators.java          | 16 ++++++
 .../unsafe/sort/PrefixComparatorsSuite.scala    | 12 +++++
 .../sql/execution/UnsafeExternalRowSorter.java  |  2 +-
 .../spark/sql/execution/SortPrefixUtils.scala   | 51 +++++++++++---------
 .../spark/sql/execution/SparkStrategies.scala   |  4 +-
 .../org/apache/spark/sql/execution/sort.scala   |  5 +-
 .../execution/RowFormatConvertersSuite.scala    |  2 +-
 .../sql/execution/UnsafeExternalSortSuite.scala | 10 ++--
 8 files changed, 67 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
index a9ee604..600aff7 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
@@ -29,6 +29,7 @@ public class PrefixComparators {
 
   public static final StringPrefixComparator STRING = new StringPrefixComparator();
   public static final IntegralPrefixComparator INTEGRAL = new IntegralPrefixComparator();
+  public static final FloatPrefixComparator FLOAT = new FloatPrefixComparator();
   public static final DoublePrefixComparator DOUBLE = new DoublePrefixComparator();
 
   public static final class StringPrefixComparator extends PrefixComparator {
@@ -54,6 +55,21 @@ public class PrefixComparators {
     public final long NULL_PREFIX = Long.MIN_VALUE;
   }
 
+  public static final class FloatPrefixComparator extends PrefixComparator {
+    @Override
+    public int compare(long aPrefix, long bPrefix) {
+      float a = Float.intBitsToFloat((int) aPrefix);
+      float b = Float.intBitsToFloat((int) bPrefix);
+      return Utils.nanSafeCompareFloats(a, b);
+    }
+
+    public long computePrefix(float value) {
+      return Float.floatToIntBits(value) & 0xffffffffL;
+    }
+
+    public final long NULL_PREFIX = computePrefix(Float.NEGATIVE_INFINITY);
+  }
+
   public static final class DoublePrefixComparator extends PrefixComparator {
     @Override
     public int compare(long aPrefix, long bPrefix) {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
index 26b7a9e..cf53a8a 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
@@ -55,6 +55,18 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks {
     forAll { (s1: String, s2: String) => testPrefixComparison(s1, s2) }
   }
 
+  test("float prefix comparator handles NaN properly") {
+    val nan1: Float = java.lang.Float.intBitsToFloat(0x7f800001)
+    val nan2: Float = java.lang.Float.intBitsToFloat(0x7fffffff)
+    assert(nan1.isNaN)
+    assert(nan2.isNaN)
+    val nan1Prefix = PrefixComparators.FLOAT.computePrefix(nan1)
+    val nan2Prefix = PrefixComparators.FLOAT.computePrefix(nan2)
+    assert(nan1Prefix === nan2Prefix)
+    val floatMaxPrefix = PrefixComparators.FLOAT.computePrefix(Float.MaxValue)
+    assert(PrefixComparators.FLOAT.compare(nan1Prefix, floatMaxPrefix) === 1)
+  }
+
   test("double prefix comparator handles NaNs properly") {
     val nan1: Double = java.lang.Double.longBitsToDouble(0x7ff0000000000001L)
     val nan2: Double = java.lang.Double.longBitsToDouble(0x7fffffffffffffffL)

http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 8342833..4c3f2c6 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -121,7 +121,7 @@ final class UnsafeExternalRowSorter {
         // here in order to prevent memory leaks.
         cleanupResources();
       }
-      return new AbstractScalaRowIterator<InternalRow>() {
+      return new AbstractScalaRowIterator() {
 
         private final int numFields = schema.length();
         private UnsafeRow row = new UnsafeRow();

http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
index 050d27f..2dee354 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
@@ -19,7 +19,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, PrefixComparator}
@@ -39,54 +39,57 @@ object SortPrefixUtils {
     sortOrder.dataType match {
       case StringType => PrefixComparators.STRING
       case BooleanType | ByteType | ShortType | IntegerType | LongType => PrefixComparators.INTEGRAL
-      case FloatType | DoubleType => PrefixComparators.DOUBLE
+      case FloatType => PrefixComparators.FLOAT
+      case DoubleType => PrefixComparators.DOUBLE
       case _ => NoOpPrefixComparator
     }
   }
 
   def getPrefixComputer(sortOrder: SortOrder): InternalRow => Long = {
-    val bound = sortOrder.child.asInstanceOf[BoundReference]
-    val pos = bound.ordinal
     sortOrder.dataType match {
-      case StringType =>
-        (row: InternalRow) => {
-          PrefixComparators.STRING.computePrefix(row.getUTF8String(pos))
-        }
+      case StringType => (row: InternalRow) => {
+        PrefixComparators.STRING.computePrefix(sortOrder.child.eval(row).asInstanceOf[UTF8String])
+      }
       case BooleanType =>
         (row: InternalRow) => {
-          if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX
-          else if (row.getBoolean(pos)) 1
+          val exprVal = sortOrder.child.eval(row)
+          if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+          else if (sortOrder.child.eval(row).asInstanceOf[Boolean]) 1
           else 0
         }
       case ByteType =>
         (row: InternalRow) => {
-          if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getByte(pos)
+          val exprVal = sortOrder.child.eval(row)
+          if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+          else sortOrder.child.eval(row).asInstanceOf[Byte]
         }
       case ShortType =>
         (row: InternalRow) => {
-          if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getShort(pos)
+          val exprVal = sortOrder.child.eval(row)
+          if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+          else sortOrder.child.eval(row).asInstanceOf[Short]
         }
       case IntegerType =>
         (row: InternalRow) => {
-          if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getInt(pos)
+          val exprVal = sortOrder.child.eval(row)
+          if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+          else sortOrder.child.eval(row).asInstanceOf[Int]
         }
       case LongType =>
         (row: InternalRow) => {
-          if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getLong(pos)
+          val exprVal = sortOrder.child.eval(row)
+          if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+          else sortOrder.child.eval(row).asInstanceOf[Long]
         }
       case FloatType => (row: InternalRow) => {
-        if (row.isNullAt(pos)) {
-          PrefixComparators.DOUBLE.NULL_PREFIX
-        } else {
-          PrefixComparators.DOUBLE.computePrefix(row.getFloat(pos).toDouble)
-        }
+        val exprVal = sortOrder.child.eval(row)
+        if (exprVal == null) PrefixComparators.FLOAT.NULL_PREFIX
+        else PrefixComparators.FLOAT.computePrefix(sortOrder.child.eval(row).asInstanceOf[Float])
       }
       case DoubleType => (row: InternalRow) => {
-        if (row.isNullAt(pos)) {
-          PrefixComparators.DOUBLE.NULL_PREFIX
-        } else {
-          PrefixComparators.DOUBLE.computePrefix(row.getDouble(pos))
-        }
+        val exprVal = sortOrder.child.eval(row)
+        if (exprVal == null) PrefixComparators.DOUBLE.NULL_PREFIX
+        else PrefixComparators.DOUBLE.computePrefix(sortOrder.child.eval(row).asInstanceOf[Double])
       }
       case _ => (row: InternalRow) => 0L
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 4ab2c41..f3ef066 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -340,8 +340,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
      */
     def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
       if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled &&
-        TungstenSort.supportsSchema(child.schema)) {
-        execution.TungstenSort(sortExprs, global, child)
+        UnsafeExternalSort.supportsSchema(child.schema)) {
+        execution.UnsafeExternalSort(sortExprs, global, child)
       } else if (sqlContext.conf.externalSortEnabled) {
         execution.ExternalSort(sortExprs, global, child)
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index d0ad310..f822088 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -97,7 +97,7 @@ case class ExternalSort(
  * @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
  *                           spill every `frequency` records.
  */
-case class TungstenSort(
+case class UnsafeExternalSort(
     sortOrder: Seq[SortOrder],
     global: Boolean,
     child: SparkPlan,
@@ -110,6 +110,7 @@ case class TungstenSort(
     if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
 
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
+    assert(codegenEnabled, "UnsafeExternalSort requires code generation to be enabled")
     def doSort(iterator: Iterator[InternalRow]): Iterator[InternalRow] = {
       val ordering = newOrdering(sortOrder, child.output)
       val boundSortExpression = BindReferences.bindReference(sortOrder.head, child.output)
@@ -148,7 +149,7 @@ case class TungstenSort(
 }
 
 @DeveloperApi
-object TungstenSort {
+object UnsafeExternalSort {
   /**
    * Return true if UnsafeExternalSort can sort rows with the given schema, false otherwise.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index c458f95..7b75f75 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -31,7 +31,7 @@ class RowFormatConvertersSuite extends SparkPlanTest {
 
   private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
   assert(!outputsSafe.outputsUnsafeRows)
-  private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null))
+  private val outputsUnsafe = UnsafeExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
   assert(outputsUnsafe.outputsUnsafeRows)
 
   test("planner should insert unsafe->safe conversions when required") {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
index 9cabc4b..7a4baa9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
@@ -42,7 +42,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
     TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
     checkThatPlansAgree(
       (1 to 100).map(v => Tuple1(v)).toDF("a"),
-      (child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)),
+      (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
       (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
       sortAnswers = false
     )
@@ -53,7 +53,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
     try {
       checkThatPlansAgree(
         (1 to 100).map(v => Tuple1(v)).toDF("a"),
-        (child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)),
+        (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
         (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
         sortAnswers = false
       )
@@ -68,7 +68,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
     val stringLength = 1024 * 1024 * 2
     checkThatPlansAgree(
       Seq(Tuple1("a" * stringLength), Tuple1("b" * stringLength)).toDF("a").repartition(1),
-      TungstenSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
+      UnsafeExternalSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
       Sort(sortOrder, global = true, _: SparkPlan),
       sortAnswers = false
     )
@@ -88,11 +88,11 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
         TestSQLContext.sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
         StructType(StructField("a", dataType, nullable = true) :: Nil)
       )
-      assert(TungstenSort.supportsSchema(inputDf.schema))
+      assert(UnsafeExternalSort.supportsSchema(inputDf.schema))
       checkThatPlansAgree(
         inputDf,
         plan => ConvertToSafe(
-          TungstenSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)),
+          UnsafeExternalSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)),
         Sort(sortOrder, global = true, _: SparkPlan),
         sortAnswers = false
       )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org