You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/08/22 02:12:06 UTC
[spark] branch branch-3.0 updated: [SPARK-32672][SQL] Fix data
corruption in boolean bit set compression
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 87f1d51 [SPARK-32672][SQL] Fix data corruption in boolean bit set compression
87f1d51 is described below
commit 87f1d51bcbd758387150a30e2cbc8540a2c6b227
Author: Robert (Bobby) Evans <bo...@apache.org>
AuthorDate: Sat Aug 22 11:07:14 2020 +0900
[SPARK-32672][SQL] Fix data corruption in boolean bit set compression
## What changes were proposed in this pull request?
This fixed SPARK-32672 a data corruption. Essentially the BooleanBitSet CompressionScheme would miss nulls at the end of a CompressedBatch. The values would then default to false.
### Why are the changes needed?
It fixes data corruption
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
I manually tested it against the original issue that was producing errors for me. I also added in a unit test.
Closes #29506 from revans2/SPARK-32672.
Authored-by: Robert (Bobby) Evans <bo...@apache.org>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit 12f4331b9eb563cb0cfbf6a241d1d085ca4f7676)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../columnar/compression/compressionSchemes.scala | 6 ++---
.../columnar/compression/BooleanBitSetSuite.scala | 26 ++++++++++++++++++++++
2 files changed, 29 insertions(+), 3 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
index 00a1d54..3cc59af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
@@ -318,7 +318,8 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
var valueCountLocal = 0
var currentValueLocal: Long = 0
- while (valueCountLocal < runLocal || (pos < capacity)) {
+ while (pos < capacity) {
+ assert(valueCountLocal <= runLocal)
if (pos != nextNullIndex) {
if (valueCountLocal == runLocal) {
currentValueLocal = getFunction(buffer)
@@ -616,7 +617,6 @@ private[columnar] case object BooleanBitSet extends CompressionScheme {
override def hasNext: Boolean = visited < count
override def decompress(columnVector: WritableColumnVector, capacity: Int): Unit = {
- val countLocal = count
var currentWordLocal: Long = 0
var visitedLocal: Int = 0
val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
@@ -626,7 +626,7 @@ private[columnar] case object BooleanBitSet extends CompressionScheme {
var pos = 0
var seenNulls = 0
- while (visitedLocal < countLocal) {
+ while (pos < capacity) {
if (pos != nextNullIndex) {
val bit = visitedLocal % BITS_PER_LONG
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
index 192db0e..111a620d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
@@ -156,4 +156,30 @@ class BooleanBitSetSuite extends SparkFunSuite {
test(s"$BooleanBitSet: multiple words and 1 more bit for decompression()") {
skeletonForDecompress(BITS_PER_LONG * 2 + 1)
}
+
+ test(s"$BooleanBitSet: Only nulls for decompression()") {
+ val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN, BooleanBitSet)
+ val numRows = 10
+
+ val rows = Seq.fill[InternalRow](numRows)({
+ val row = new GenericInternalRow(1)
+ row.setNullAt(0)
+ row
+ })
+ rows.foreach(builder.appendFrom(_, 0))
+ val buffer = builder.build()
+
+ // Rewinds, skips column header and 4 more bytes for compression scheme ID
+ val headerSize = CompressionScheme.columnHeaderSize(buffer)
+ buffer.position(headerSize)
+ assertResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt())
+
+ val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
+ val columnVector = new OnHeapColumnVector(numRows, BooleanType)
+ decoder.decompress(columnVector, numRows)
+
+ (0 until numRows).foreach { rowNum =>
+ assert(columnVector.isNullAt(rowNum))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org