You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/30 10:04:31 UTC
spark git commit: Revert "[SPARK-9458] Avoid object allocation in
prefix generation."
Repository: spark
Updated Branches:
refs/heads/master 76f2e393a -> 4a8bb9d00
Revert "[SPARK-9458] Avoid object allocation in prefix generation."
This reverts commit 9514d874f0cf61f1eb4ec4f5f66e053119f769c9.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a8bb9d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a8bb9d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a8bb9d0
Branch: refs/heads/master
Commit: 4a8bb9d00d8181aff5f5183194d9aa2a65deacdf
Parents: 76f2e39
Author: Reynold Xin <rx...@databricks.com>
Authored: Thu Jul 30 01:04:24 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Jul 30 01:04:24 2015 -0700
----------------------------------------------------------------------
.../unsafe/sort/PrefixComparators.java | 16 ++++++
.../unsafe/sort/PrefixComparatorsSuite.scala | 12 +++++
.../sql/execution/UnsafeExternalRowSorter.java | 2 +-
.../spark/sql/execution/SortPrefixUtils.scala | 51 +++++++++++---------
.../spark/sql/execution/SparkStrategies.scala | 4 +-
.../org/apache/spark/sql/execution/sort.scala | 5 +-
.../execution/RowFormatConvertersSuite.scala | 2 +-
.../sql/execution/UnsafeExternalSortSuite.scala | 10 ++--
8 files changed, 67 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
index a9ee604..600aff7 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
@@ -29,6 +29,7 @@ public class PrefixComparators {
public static final StringPrefixComparator STRING = new StringPrefixComparator();
public static final IntegralPrefixComparator INTEGRAL = new IntegralPrefixComparator();
+ public static final FloatPrefixComparator FLOAT = new FloatPrefixComparator();
public static final DoublePrefixComparator DOUBLE = new DoublePrefixComparator();
public static final class StringPrefixComparator extends PrefixComparator {
@@ -54,6 +55,21 @@ public class PrefixComparators {
public final long NULL_PREFIX = Long.MIN_VALUE;
}
+ public static final class FloatPrefixComparator extends PrefixComparator {
+ @Override
+ public int compare(long aPrefix, long bPrefix) {
+ float a = Float.intBitsToFloat((int) aPrefix);
+ float b = Float.intBitsToFloat((int) bPrefix);
+ return Utils.nanSafeCompareFloats(a, b);
+ }
+
+ public long computePrefix(float value) {
+ return Float.floatToIntBits(value) & 0xffffffffL;
+ }
+
+ public final long NULL_PREFIX = computePrefix(Float.NEGATIVE_INFINITY);
+ }
+
public static final class DoublePrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
index 26b7a9e..cf53a8a 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
@@ -55,6 +55,18 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks {
forAll { (s1: String, s2: String) => testPrefixComparison(s1, s2) }
}
+ test("float prefix comparator handles NaN properly") {
+ val nan1: Float = java.lang.Float.intBitsToFloat(0x7f800001)
+ val nan2: Float = java.lang.Float.intBitsToFloat(0x7fffffff)
+ assert(nan1.isNaN)
+ assert(nan2.isNaN)
+ val nan1Prefix = PrefixComparators.FLOAT.computePrefix(nan1)
+ val nan2Prefix = PrefixComparators.FLOAT.computePrefix(nan2)
+ assert(nan1Prefix === nan2Prefix)
+ val floatMaxPrefix = PrefixComparators.FLOAT.computePrefix(Float.MaxValue)
+ assert(PrefixComparators.FLOAT.compare(nan1Prefix, floatMaxPrefix) === 1)
+ }
+
test("double prefix comparator handles NaNs properly") {
val nan1: Double = java.lang.Double.longBitsToDouble(0x7ff0000000000001L)
val nan2: Double = java.lang.Double.longBitsToDouble(0x7fffffffffffffffL)
http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 8342833..4c3f2c6 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -121,7 +121,7 @@ final class UnsafeExternalRowSorter {
// here in order to prevent memory leaks.
cleanupResources();
}
- return new AbstractScalaRowIterator<InternalRow>() {
+ return new AbstractScalaRowIterator() {
private final int numFields = schema.length();
private UnsafeRow row = new UnsafeRow();
http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
index 050d27f..2dee354 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
@@ -19,7 +19,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, PrefixComparator}
@@ -39,54 +39,57 @@ object SortPrefixUtils {
sortOrder.dataType match {
case StringType => PrefixComparators.STRING
case BooleanType | ByteType | ShortType | IntegerType | LongType => PrefixComparators.INTEGRAL
- case FloatType | DoubleType => PrefixComparators.DOUBLE
+ case FloatType => PrefixComparators.FLOAT
+ case DoubleType => PrefixComparators.DOUBLE
case _ => NoOpPrefixComparator
}
}
def getPrefixComputer(sortOrder: SortOrder): InternalRow => Long = {
- val bound = sortOrder.child.asInstanceOf[BoundReference]
- val pos = bound.ordinal
sortOrder.dataType match {
- case StringType =>
- (row: InternalRow) => {
- PrefixComparators.STRING.computePrefix(row.getUTF8String(pos))
- }
+ case StringType => (row: InternalRow) => {
+ PrefixComparators.STRING.computePrefix(sortOrder.child.eval(row).asInstanceOf[UTF8String])
+ }
case BooleanType =>
(row: InternalRow) => {
- if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX
- else if (row.getBoolean(pos)) 1
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+ else if (sortOrder.child.eval(row).asInstanceOf[Boolean]) 1
else 0
}
case ByteType =>
(row: InternalRow) => {
- if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getByte(pos)
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+ else sortOrder.child.eval(row).asInstanceOf[Byte]
}
case ShortType =>
(row: InternalRow) => {
- if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getShort(pos)
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+ else sortOrder.child.eval(row).asInstanceOf[Short]
}
case IntegerType =>
(row: InternalRow) => {
- if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getInt(pos)
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+ else sortOrder.child.eval(row).asInstanceOf[Int]
}
case LongType =>
(row: InternalRow) => {
- if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getLong(pos)
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX
+ else sortOrder.child.eval(row).asInstanceOf[Long]
}
case FloatType => (row: InternalRow) => {
- if (row.isNullAt(pos)) {
- PrefixComparators.DOUBLE.NULL_PREFIX
- } else {
- PrefixComparators.DOUBLE.computePrefix(row.getFloat(pos).toDouble)
- }
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.FLOAT.NULL_PREFIX
+ else PrefixComparators.FLOAT.computePrefix(sortOrder.child.eval(row).asInstanceOf[Float])
}
case DoubleType => (row: InternalRow) => {
- if (row.isNullAt(pos)) {
- PrefixComparators.DOUBLE.NULL_PREFIX
- } else {
- PrefixComparators.DOUBLE.computePrefix(row.getDouble(pos))
- }
+ val exprVal = sortOrder.child.eval(row)
+ if (exprVal == null) PrefixComparators.DOUBLE.NULL_PREFIX
+ else PrefixComparators.DOUBLE.computePrefix(sortOrder.child.eval(row).asInstanceOf[Double])
}
case _ => (row: InternalRow) => 0L
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 4ab2c41..f3ef066 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -340,8 +340,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
*/
def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled &&
- TungstenSort.supportsSchema(child.schema)) {
- execution.TungstenSort(sortExprs, global, child)
+ UnsafeExternalSort.supportsSchema(child.schema)) {
+ execution.UnsafeExternalSort(sortExprs, global, child)
} else if (sqlContext.conf.externalSortEnabled) {
execution.ExternalSort(sortExprs, global, child)
} else {
http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index d0ad310..f822088 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -97,7 +97,7 @@ case class ExternalSort(
* @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
* spill every `frequency` records.
*/
-case class TungstenSort(
+case class UnsafeExternalSort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan,
@@ -110,6 +110,7 @@ case class TungstenSort(
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
+ assert(codegenEnabled, "UnsafeExternalSort requires code generation to be enabled")
def doSort(iterator: Iterator[InternalRow]): Iterator[InternalRow] = {
val ordering = newOrdering(sortOrder, child.output)
val boundSortExpression = BindReferences.bindReference(sortOrder.head, child.output)
@@ -148,7 +149,7 @@ case class TungstenSort(
}
@DeveloperApi
-object TungstenSort {
+object UnsafeExternalSort {
/**
* Return true if UnsafeExternalSort can sort rows with the given schema, false otherwise.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index c458f95..7b75f75 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -31,7 +31,7 @@ class RowFormatConvertersSuite extends SparkPlanTest {
private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
assert(!outputsSafe.outputsUnsafeRows)
- private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null))
+ private val outputsUnsafe = UnsafeExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
assert(outputsUnsafe.outputsUnsafeRows)
test("planner should insert unsafe->safe conversions when required") {
http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
index 9cabc4b..7a4baa9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala
@@ -42,7 +42,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false")
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
- (child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)),
+ (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
(child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
sortAnswers = false
)
@@ -53,7 +53,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
try {
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
- (child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)),
+ (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)),
(child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
sortAnswers = false
)
@@ -68,7 +68,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
val stringLength = 1024 * 1024 * 2
checkThatPlansAgree(
Seq(Tuple1("a" * stringLength), Tuple1("b" * stringLength)).toDF("a").repartition(1),
- TungstenSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
+ UnsafeExternalSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
Sort(sortOrder, global = true, _: SparkPlan),
sortAnswers = false
)
@@ -88,11 +88,11 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll {
TestSQLContext.sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
StructType(StructField("a", dataType, nullable = true) :: Nil)
)
- assert(TungstenSort.supportsSchema(inputDf.schema))
+ assert(UnsafeExternalSort.supportsSchema(inputDf.schema))
checkThatPlansAgree(
inputDf,
plan => ConvertToSafe(
- TungstenSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)),
+ UnsafeExternalSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)),
Sort(sortOrder, global = true, _: SparkPlan),
sortAnswers = false
)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org