You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Davies Liu (JIRA)" <ji...@apache.org> on 2015/12/09 06:34:10 UTC

[jira] [Resolved] (SPARK-12222) deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception

     [ https://issues.apache.org/jira/browse/SPARK-12222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Davies Liu resolved SPARK-12222.
--------------------------------
       Resolution: Fixed
    Fix Version/s: 1.6.0
                   2.0.0

Issue resolved by pull request 10213
[https://github.com/apache/spark/pull/10213]

> deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-12222
>                 URL: https://issues.apache.org/jira/browse/SPARK-12222
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.0
>            Reporter: Fei Wang
>             Fix For: 2.0.0, 1.6.0
>
>
> here are some problems when deserialize RoaringBitmap. see the examples below:
> run this piece of code
> ```
> import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
> import java.io.DataInput
>     class KryoInputDataInputBridge(input: KryoInput) extends DataInput {
>       override def readLong(): Long = input.readLong()
>       override def readChar(): Char = input.readChar()
>       override def readFloat(): Float = input.readFloat()
>       override def readByte(): Byte = input.readByte()
>       override def readShort(): Short = input.readShort()
>       override def readUTF(): String = input.readString() // readString in kryo does utf8
>       override def readInt(): Int = input.readInt()
>       override def readUnsignedShort(): Int = input.readShortUnsigned()
>       override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
>       override def readFully(b: Array[Byte]): Unit = input.read(b)
>       override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
>       override def readLine(): String = throw new UnsupportedOperationException("readLine")
>       override def readBoolean(): Boolean = input.readBoolean()
>       override def readUnsignedByte(): Int = input.readByteUnsigned()
>       override def readDouble(): Double = input.readDouble()
>     }
>     class KryoOutputDataOutputBridge(output: KryoOutput) extends DataOutput {
>       override def writeFloat(v: Float): Unit = output.writeFloat(v)
>       // There is no "readChars" counterpart, except maybe "readLine", which is not supported
>       override def writeChars(s: String): Unit = throw new UnsupportedOperationException("writeChars")
>       override def writeDouble(v: Double): Unit = output.writeDouble(v)
>       override def writeUTF(s: String): Unit = output.writeString(s) // writeString in kryo does UTF8
>       override def writeShort(v: Int): Unit = output.writeShort(v)
>       override def writeInt(v: Int): Unit = output.writeInt(v)
>       override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v)
>       override def write(b: Int): Unit = output.write(b)
>       override def write(b: Array[Byte]): Unit = output.write(b)
>       override def write(b: Array[Byte], off: Int, len: Int): Unit = output.write(b, off, len)
>       override def writeBytes(s: String): Unit = output.writeString(s)
>       override def writeChar(v: Int): Unit = output.writeChar(v.toChar)
>       override def writeLong(v: Long): Unit = output.writeLong(v)
>       override def writeByte(v: Int): Unit = output.writeByte(v)
>     }
>     val outStream = new FileOutputStream("D:\\wfserde")
>     val output = new KryoOutput(outStream)
>     val bitmap = new RoaringBitmap
>     bitmap.add(1)
>     bitmap.add(3)
>     bitmap.add(5)
>     bitmap.serialize(new KryoOutputDataOutputBridge(output))
>     output.flush()
>     output.close()
>     val inStream = new FileInputStream("D:\\wfserde")
>     val input = new KryoInput(inStream)
>     val ret = new RoaringBitmap
>     ret.deserialize(new KryoInputDataInputBridge(input))
>     println(ret)
> ```
> this will throw `Buffer underflow` error:
> ```
> com.esotericsoftware.kryo.KryoException: Buffer underflow.
> 	at com.esotericsoftware.kryo.io.Input.require(Input.java:156)
> 	at com.esotericsoftware.kryo.io.Input.skip(Input.java:131)
> 	at com.esotericsoftware.kryo.io.Input.skip(Input.java:264)
> 	at org.apache.spark.sql.SQLQuerySuite$$anonfun$6$KryoInputDataInputBridge$1.skipBytes
> ```
> after same investigation,  i found this is caused by a bug of kryo's `Input.skip(long count)`(https://github.com/EsotericSoftware/kryo/issues/119) and we call this method in `KryoInputDataInputBridge`.
> So i think we can fix this issue in this two ways:
> 1) upgrade the kryo version to 2.23.0 or 2.24.0, which has fix this bug in kryo (i am not sure the upgrade is safe in spark, can you check it? @davies )
> 2) we can bypass the  kryo's `Input.skip(long count)` by directly call another `skip` method in kryo's Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124), i.e. write the bug-fixed version of `Input.skip(long count)` in KryoInputDataInputBridge's `skipBytes` method:
> ```
>    class KryoInputDataInputBridge(input: KryoInput) extends DataInput {
>       ...
>       override def skipBytes(n: Int): Int = {
>         var remaining: Long = n
>         while (remaining > 0) {
>           val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int]
>           input.skip(skip)
>           remaining -= skip
>         }
>         n
>      }
>       ...
>     }
> ```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org