You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/08/22 02:12:06 UTC

[spark] branch branch-3.0 updated: [SPARK-32672][SQL] Fix data corruption in boolean bit set compression

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 87f1d51  [SPARK-32672][SQL] Fix data corruption in boolean bit set compression
87f1d51 is described below

commit 87f1d51bcbd758387150a30e2cbc8540a2c6b227
Author: Robert (Bobby) Evans <bo...@apache.org>
AuthorDate: Sat Aug 22 11:07:14 2020 +0900

    [SPARK-32672][SQL] Fix data corruption in boolean bit set compression
    
    ## What changes were proposed in this pull request?
    
    This fixed SPARK-32672 a data corruption.  Essentially the BooleanBitSet CompressionScheme would miss nulls at the end of a CompressedBatch.  The values would then default to false.
    
    ### Why are the changes needed?
    It fixes data corruption
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    I manually tested it against the original issue that was producing errors for me.  I also added in a unit test.
    
    Closes #29506 from revans2/SPARK-32672.
    
    Authored-by: Robert (Bobby) Evans <bo...@apache.org>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 12f4331b9eb563cb0cfbf6a241d1d085ca4f7676)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../columnar/compression/compressionSchemes.scala  |  6 ++---
 .../columnar/compression/BooleanBitSetSuite.scala  | 26 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
index 00a1d54..3cc59af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
@@ -318,7 +318,8 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
       var valueCountLocal = 0
       var currentValueLocal: Long = 0
 
-      while (valueCountLocal < runLocal || (pos < capacity)) {
+      while (pos < capacity) {
+        assert(valueCountLocal <= runLocal)
         if (pos != nextNullIndex) {
           if (valueCountLocal == runLocal) {
             currentValueLocal = getFunction(buffer)
@@ -616,7 +617,6 @@ private[columnar] case object BooleanBitSet extends CompressionScheme {
     override def hasNext: Boolean = visited < count
 
     override def decompress(columnVector: WritableColumnVector, capacity: Int): Unit = {
-      val countLocal = count
       var currentWordLocal: Long = 0
       var visitedLocal: Int = 0
       val nullsBuffer = buffer.duplicate().order(ByteOrder.nativeOrder())
@@ -626,7 +626,7 @@ private[columnar] case object BooleanBitSet extends CompressionScheme {
       var pos = 0
       var seenNulls = 0
 
-      while (visitedLocal < countLocal) {
+      while (pos < capacity) {
         if (pos != nextNullIndex) {
           val bit = visitedLocal % BITS_PER_LONG
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
index 192db0e..111a620d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala
@@ -156,4 +156,30 @@ class BooleanBitSetSuite extends SparkFunSuite {
   test(s"$BooleanBitSet: multiple words and 1 more bit for decompression()") {
     skeletonForDecompress(BITS_PER_LONG * 2 + 1)
   }
+
+  test(s"$BooleanBitSet: Only nulls for decompression()") {
+    val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN, BooleanBitSet)
+    val numRows = 10
+
+    val rows = Seq.fill[InternalRow](numRows)({
+      val row = new GenericInternalRow(1)
+      row.setNullAt(0)
+      row
+    })
+    rows.foreach(builder.appendFrom(_, 0))
+    val buffer = builder.build()
+
+    // Rewinds, skips column header and 4 more bytes for compression scheme ID
+    val headerSize = CompressionScheme.columnHeaderSize(buffer)
+    buffer.position(headerSize)
+    assertResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt())
+
+    val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
+    val columnVector = new OnHeapColumnVector(numRows, BooleanType)
+    decoder.decompress(columnVector, numRows)
+
+    (0 until numRows).foreach { rowNum =>
+      assert(columnVector.isNullAt(rowNum))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org