You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2014/10/15 23:11:34 UTC

[jira] [Commented] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast

    [ https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14172949#comment-14172949 ] 

Josh Rosen commented on SPARK-3958:
-----------------------------------

Digging into this stacktrace in more detail:

Snappy-java's {{SnappyOutputStream}} writes its own 8-byte header at the beginning of the serialized output.  This header consists of an 8-byte magic value followed by two 4-byte version numbers.  This header is distinct from Snappy's own 6-byte magic number / header.

{{org.xerial.snappy.SnappyInputStream.readHeader}} is implemented like this (in Snappy-Java 1.1.1.3):

{code}
protected void readHeader() throws IOException {
        byte[] header = new byte[SnappyCodec.headerSize()];
        int readBytes = 0;
        while (readBytes < header.length) {
            int ret = in.read(header, readBytes, header.length - readBytes);
            if (ret == -1)
                break;
            readBytes += ret;
        }

        // Quick test of the header 
        if (readBytes < header.length || header[0] != SnappyCodec.MAGIC_HEADER[0]) {
            // do the default uncompression
            readFully(header, readBytes);
            return;
        }

        SnappyCodec codec = SnappyCodec.readHeader(new ByteArrayInputStream(header));
        if (codec.isValidMagicHeader()) {
            // The input data is compressed by SnappyOutputStream
            if (codec.version < SnappyCodec.MINIMUM_COMPATIBLE_VERSION) {
                throw new IOException(String.format(
                        "compressed with imcompatible codec version %d. At least version %d is required",
                        codec.version, SnappyCodec.MINIMUM_COMPATIBLE_VERSION));
            }
        }
        else {
            // (probably) compressed by Snappy.compress(byte[])
            readFully(header, readBytes);
            return;
        }
    }
{code}

It starts by attempting to read the 8-byte header.  The first {{while}} loop exits when we've either read 8 bytes of header data or if the input stream was closed before it could read a complete header.  The following code checks whether the header is unexpectedly short or whether it doesn't match the snappy-java magic header.  In our case, we end up taking this branch and calling {{readFully(header, readBytes)}} in order to perform the default Snappy decompression  This is the wrong branch to take (since our data was compressed with a SnappyOutputStream), leading to the PARSING_ERROR.

Based on this, I think that the input data to the SnappyInputStream is somehow being corrupted.  It's not obvious whether this corruption is causing the input data to be too short or whether the start of the stream has the wrong contents.  I'll keep digging and look into adding some size-checking assertions throughout our code.

> Possible stream-corruption issues in TorrentBroadcast
> -----------------------------------------------------
>
>                 Key: SPARK-3958
>                 URL: https://issues.apache.org/jira/browse/SPARK-3958
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.1.0, 1.2.0
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>
> TorrentBroadcast deserialization sometimes fails with decompression errors, which are most likely caused by stream-corruption exceptions.  For example, this can manifest itself as a Snappy PARSING_ERROR when deserializing a broadcasted task:
> {code}
> 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8
> 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered locally
> 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast variable 8
> 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 took 5.3433E-5 s
> 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 18)
> java.io.IOException: PARSING_ERROR(2)
> 	at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> 	at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
> 	at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
> 	at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
> 	at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
> 	at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
> 	at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
> 	at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
> 	at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170)
> 	at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
> 	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> SPARK-3630 is an umbrella ticket for investigating all causes of these Kryo and Snappy deserialization errors.  This ticket is for a more narrowly-focused exploration of the TorrentBroadcast version of these errors, since the similar errors that we've seen in sort-based shuffle seem to be explained by a different cause (see SPARK-3948).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org