You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Gerard Maas (JIRA)" <ji...@apache.org> on 2016/01/25 09:46:39 UTC

[jira] [Commented] (SPARK-3958) Possible stream-corruption issues in TorrentBroadcast

    [ https://issues.apache.org/jira/browse/SPARK-3958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15114906#comment-15114906 ] 

Gerard Maas commented on SPARK-3958:
------------------------------------

[~pwendell] [~joshrosen] We just hit this bug in one of our production jobs using Spark Streaming 1.4.1. Each task spawned by the streaming job fails down the road.
This jobs has been working fine for months, so I'm not clear on whether we can narrow down the conditions to reproduce it.

Here's the exception:

{code}
[Stage 16049:(0 + 0) / 24][Stage 16056:(0 + 0) / 24][Stage 16058:(0 + 0) / 24]Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 17478.0 failed 6 times, most recent failure: Lost task 23.5 in stage 17478.0 (TID 172352, dnode-6.hdfs.private): java.io.IOException: PARSING_ERROR(2)
	at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
	at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
	at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
	at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:358)
	at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:387)
	at java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:2589)
	at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2599)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1319)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
	at org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169)
	at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:200)
	at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:197)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127)
	at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91)
	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:70)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}

(Murphy's law: Bugs reappear the moment you close them)

> Possible stream-corruption issues in TorrentBroadcast
> -----------------------------------------------------
>
>                 Key: SPARK-3958
>                 URL: https://issues.apache.org/jira/browse/SPARK-3958
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.1.0, 1.2.0
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>            Priority: Blocker
>         Attachments: spark_ex.logs
>
>
> TorrentBroadcast deserialization sometimes fails with decompression errors, which are most likely caused by stream-corruption exceptions.  For example, this can manifest itself as a Snappy PARSING_ERROR when deserializing a broadcasted task:
> {code}
> 14/10/14 17:20:55.016 DEBUG BlockManager: Getting local block broadcast_8
> 14/10/14 17:20:55.016 DEBUG BlockManager: Block broadcast_8 not registered locally
> 14/10/14 17:20:55.016 INFO TorrentBroadcast: Started reading broadcast variable 8
> 14/10/14 17:20:55.017 INFO TorrentBroadcast: Reading broadcast variable 8 took 5.3433E-5 s
> 14/10/14 17:20:55.017 ERROR Executor: Exception in task 2.0 in stage 8.0 (TID 18)
> java.io.IOException: PARSING_ERROR(2)
> 	at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> 	at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
> 	at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
> 	at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
> 	at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
> 	at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
> 	at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
> 	at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
> 	at org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:170)
> 	at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
> 	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> SPARK-3630 is an umbrella ticket for investigating all causes of these Kryo and Snappy deserialization errors.  This ticket is for a more narrowly-focused exploration of the TorrentBroadcast version of these errors, since the similar errors that we've seen in sort-based shuffle seem to be explained by a different cause (see SPARK-3948).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org