You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/05 02:16:40 UTC
spark git commit: [SPARK-22957] ApproxQuantile breaks if the number
of rows exceeds MaxInt
Repository: spark
Updated Branches:
refs/heads/master 0428368c2 -> df7fc3ef3
[SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt
## What changes were proposed in this pull request?
32bit Int was used for row rank.
That overflowed in a dataframe with more than 2B rows.
## How was this patch tested?
Added test, but ignored, as it takes 4 minutes.
Author: Juliusz Sompolski <ju...@databricks.com>
Closes #20152 from juliuszsompolski/SPARK-22957.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df7fc3ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df7fc3ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df7fc3ef
Branch: refs/heads/master
Commit: df7fc3ef3899cadd252d2837092bebe3442d6523
Parents: 0428368
Author: Juliusz Sompolski <ju...@databricks.com>
Authored: Fri Jan 5 10:16:34 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Jan 5 10:16:34 2018 +0800
----------------------------------------------------------------------
.../expressions/aggregate/ApproximatePercentile.scala | 12 ++++++------
.../spark/sql/catalyst/util/QuantileSummaries.scala | 8 ++++----
.../scala/org/apache/spark/sql/DataFrameStatSuite.scala | 8 ++++++++
3 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/df7fc3ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
index 149ac26..a45854a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
@@ -296,8 +296,8 @@ object ApproximatePercentile {
Ints.BYTES + Doubles.BYTES + Longs.BYTES +
// length of summary.sampled
Ints.BYTES +
- // summary.sampled, Array[Stat(value: Double, g: Int, delta: Int)]
- summaries.sampled.length * (Doubles.BYTES + Ints.BYTES + Ints.BYTES)
+ // summary.sampled, Array[Stat(value: Double, g: Long, delta: Long)]
+ summaries.sampled.length * (Doubles.BYTES + Longs.BYTES + Longs.BYTES)
}
final def serialize(obj: PercentileDigest): Array[Byte] = {
@@ -312,8 +312,8 @@ object ApproximatePercentile {
while (i < summary.sampled.length) {
val stat = summary.sampled(i)
buffer.putDouble(stat.value)
- buffer.putInt(stat.g)
- buffer.putInt(stat.delta)
+ buffer.putLong(stat.g)
+ buffer.putLong(stat.delta)
i += 1
}
buffer.array()
@@ -330,8 +330,8 @@ object ApproximatePercentile {
var i = 0
while (i < sampledLength) {
val value = buffer.getDouble()
- val g = buffer.getInt()
- val delta = buffer.getInt()
+ val g = buffer.getLong()
+ val delta = buffer.getLong()
sampled(i) = Stats(value, g, delta)
i += 1
}
http://git-wip-us.apache.org/repos/asf/spark/blob/df7fc3ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
index eb7941c..b013add 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -105,7 +105,7 @@ class QuantileSummaries(
if (newSamples.isEmpty || (sampleIdx == sampled.length && opsIdx == sorted.length - 1)) {
0
} else {
- math.floor(2 * relativeError * currentCount).toInt
+ math.floor(2 * relativeError * currentCount).toLong
}
val tuple = Stats(currentSample, 1, delta)
@@ -192,10 +192,10 @@ class QuantileSummaries(
}
// Target rank
- val rank = math.ceil(quantile * count).toInt
+ val rank = math.ceil(quantile * count).toLong
val targetError = relativeError * count
// Minimum rank at current sample
- var minRank = 0
+ var minRank = 0L
var i = 0
while (i < sampled.length - 1) {
val curSample = sampled(i)
@@ -235,7 +235,7 @@ object QuantileSummaries {
* @param g the minimum rank jump from the previous value's minimum rank
* @param delta the maximum span of the rank.
*/
- case class Stats(value: Double, g: Int, delta: Int)
+ case class Stats(value: Double, g: Long, delta: Long)
private def compressImmut(
currentSamples: IndexedSeq[Stats],
http://git-wip-us.apache.org/repos/asf/spark/blob/df7fc3ef/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 46b21c3..5169d2b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -260,6 +260,14 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
assert(res2(1).isEmpty)
}
+ // SPARK-22957: check for 32bit overflow when computing rank.
+ // ignored - takes 4 minutes to run.
+ ignore("approx quantile 4: test for Int overflow") {
+ val res = spark.range(3000000000L).stat.approxQuantile("id", Array(0.8, 0.9), 0.05)
+ assert(res(0) > 2200000000.0)
+ assert(res(1) > 2200000000.0)
+ }
+
test("crosstab") {
withSQLConf(SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "false") {
val rng = new Random()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org