You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/05 02:16:40 UTC

spark git commit: [SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt

Repository: spark
Updated Branches:
  refs/heads/master 0428368c2 -> df7fc3ef3


[SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt

## What changes were proposed in this pull request?

32bit Int was used for row rank.
That overflowed in a dataframe with more than 2B rows.

## How was this patch tested?

Added test, but ignored, as it takes 4 minutes.

Author: Juliusz Sompolski <ju...@databricks.com>

Closes #20152 from juliuszsompolski/SPARK-22957.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df7fc3ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df7fc3ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df7fc3ef

Branch: refs/heads/master
Commit: df7fc3ef3899cadd252d2837092bebe3442d6523
Parents: 0428368
Author: Juliusz Sompolski <ju...@databricks.com>
Authored: Fri Jan 5 10:16:34 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Jan 5 10:16:34 2018 +0800

----------------------------------------------------------------------
 .../expressions/aggregate/ApproximatePercentile.scala   | 12 ++++++------
 .../spark/sql/catalyst/util/QuantileSummaries.scala     |  8 ++++----
 .../scala/org/apache/spark/sql/DataFrameStatSuite.scala |  8 ++++++++
 3 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/df7fc3ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
index 149ac26..a45854a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
@@ -296,8 +296,8 @@ object ApproximatePercentile {
       Ints.BYTES + Doubles.BYTES + Longs.BYTES +
       // length of summary.sampled
       Ints.BYTES +
-      // summary.sampled, Array[Stat(value: Double, g: Int, delta: Int)]
-      summaries.sampled.length * (Doubles.BYTES + Ints.BYTES + Ints.BYTES)
+      // summary.sampled, Array[Stat(value: Double, g: Long, delta: Long)]
+      summaries.sampled.length * (Doubles.BYTES + Longs.BYTES + Longs.BYTES)
     }
 
     final def serialize(obj: PercentileDigest): Array[Byte] = {
@@ -312,8 +312,8 @@ object ApproximatePercentile {
       while (i < summary.sampled.length) {
         val stat = summary.sampled(i)
         buffer.putDouble(stat.value)
-        buffer.putInt(stat.g)
-        buffer.putInt(stat.delta)
+        buffer.putLong(stat.g)
+        buffer.putLong(stat.delta)
         i += 1
       }
       buffer.array()
@@ -330,8 +330,8 @@ object ApproximatePercentile {
       var i = 0
       while (i < sampledLength) {
         val value = buffer.getDouble()
-        val g = buffer.getInt()
-        val delta = buffer.getInt()
+        val g = buffer.getLong()
+        val delta = buffer.getLong()
         sampled(i) = Stats(value, g, delta)
         i += 1
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/df7fc3ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
index eb7941c..b013add 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -105,7 +105,7 @@ class QuantileSummaries(
         if (newSamples.isEmpty || (sampleIdx == sampled.length && opsIdx == sorted.length - 1)) {
           0
         } else {
-          math.floor(2 * relativeError * currentCount).toInt
+          math.floor(2 * relativeError * currentCount).toLong
         }
 
       val tuple = Stats(currentSample, 1, delta)
@@ -192,10 +192,10 @@ class QuantileSummaries(
     }
 
     // Target rank
-    val rank = math.ceil(quantile * count).toInt
+    val rank = math.ceil(quantile * count).toLong
     val targetError = relativeError * count
     // Minimum rank at current sample
-    var minRank = 0
+    var minRank = 0L
     var i = 0
     while (i < sampled.length - 1) {
       val curSample = sampled(i)
@@ -235,7 +235,7 @@ object QuantileSummaries {
    * @param g the minimum rank jump from the previous value's minimum rank
    * @param delta the maximum span of the rank.
    */
-  case class Stats(value: Double, g: Int, delta: Int)
+  case class Stats(value: Double, g: Long, delta: Long)
 
   private def compressImmut(
       currentSamples: IndexedSeq[Stats],

http://git-wip-us.apache.org/repos/asf/spark/blob/df7fc3ef/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 46b21c3..5169d2b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -260,6 +260,14 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
     assert(res2(1).isEmpty)
   }
 
+  // SPARK-22957: check for 32bit overflow when computing rank.
+  // ignored - takes 4 minutes to run.
+  ignore("approx quantile 4: test for Int overflow") {
+    val res = spark.range(3000000000L).stat.approxQuantile("id", Array(0.8, 0.9), 0.05)
+    assert(res(0) > 2200000000.0)
+    assert(res(1) > 2200000000.0)
+  }
+
   test("crosstab") {
     withSQLConf(SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "false") {
       val rng = new Random()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org