You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/12/09 06:32:36 UTC

spark git commit: [SPARK-12222] [CORE] Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception

Repository: spark
Updated Branches:
  refs/heads/master a0046e379 -> 3934562d3


[SPARK-12222] [CORE] Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception

Jira: https://issues.apache.org/jira/browse/SPARK-12222

Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception:
```
com.esotericsoftware.kryo.KryoException: Buffer underflow.
	at com.esotericsoftware.kryo.io.Input.require(Input.java:156)
	at com.esotericsoftware.kryo.io.Input.skip(Input.java:131)
	at com.esotericsoftware.kryo.io.Input.skip(Input.java:264)
```

This is caused by a bug of kryo's `Input.skip(long count)`(https://github.com/EsotericSoftware/kryo/issues/119) and we call this method in `KryoInputDataInputBridge`.

Instead of upgrade kryo's version, this pr bypass the  kryo's `Input.skip(long count)` by directly call another `skip` method in kryo's Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124), i.e. write the bug-fixed version of `Input.skip(long count)` in KryoInputDataInputBridge's `skipBytes` method.

more detail link to https://github.com/apache/spark/pull/9748#issuecomment-162860246

Author: Fei Wang <wa...@huawei.com>

Closes #10213 from scwf/patch-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3934562d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3934562d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3934562d

Branch: refs/heads/master
Commit: 3934562d34bbe08d91c54b4bbee27870e93d7571
Parents: a0046e3
Author: Fei Wang <wa...@huawei.com>
Authored: Tue Dec 8 21:32:31 2015 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Tue Dec 8 21:32:31 2015 -0800

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       | 10 ++++++-
 .../spark/serializer/KryoSerializerSuite.scala  | 28 +++++++++++++++++++-
 2 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3934562d/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 62d445f..cb2ac5e 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -400,7 +400,15 @@ private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends Dat
   override def readUTF(): String = input.readString() // readString in kryo does utf8
   override def readInt(): Int = input.readInt()
   override def readUnsignedShort(): Int = input.readShortUnsigned()
-  override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
+  override def skipBytes(n: Int): Int = {
+    var remaining: Long = n
+    while (remaining > 0) {
+      val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int]
+      input.skip(skip)
+      remaining -= skip
+    }
+    n
+  }
   override def readFully(b: Array[Byte]): Unit = input.read(b)
   override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
   override def readLine(): String = throw new UnsupportedOperationException("readLine")

http://git-wip-us.apache.org/repos/asf/spark/blob/3934562d/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index f81fe31..9fcc22b 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -17,17 +17,21 @@
 
 package org.apache.spark.serializer
 
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileOutputStream, FileInputStream}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.reflect.ClassTag
 
 import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
+
+import org.roaringbitmap.RoaringBitmap
 
 import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
 import org.apache.spark.scheduler.HighlyCompressedMapStatus
 import org.apache.spark.serializer.KryoTest._
+import org.apache.spark.util.Utils
 import org.apache.spark.storage.BlockManagerId
 
 class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
@@ -350,6 +354,28 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     assert(thrown.getMessage.contains(kryoBufferMaxProperty))
   }
 
+  test("SPARK-12222: deserialize RoaringBitmap throw Buffer underflow exception") {
+    val dir = Utils.createTempDir()
+    val tmpfile = dir.toString + "/RoaringBitmap"
+    val outStream = new FileOutputStream(tmpfile)
+    val output = new KryoOutput(outStream)
+    val bitmap = new RoaringBitmap
+    bitmap.add(1)
+    bitmap.add(3)
+    bitmap.add(5)
+    bitmap.serialize(new KryoOutputDataOutputBridge(output))
+    output.flush()
+    output.close()
+
+    val inStream = new FileInputStream(tmpfile)
+    val input = new KryoInput(inStream)
+    val ret = new RoaringBitmap
+    ret.deserialize(new KryoInputDataInputBridge(input))
+    input.close()
+    assert(ret == bitmap)
+    Utils.deleteRecursively(dir)
+  }
+
   test("getAutoReset") {
     val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
     assert(ser.getAutoReset)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org