You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/10/03 15:47:20 UTC

[jira] [Commented] (SPARK-17766) Write ahead log corruption on a toy project

    [ https://issues.apache.org/jira/browse/SPARK-17766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15542709#comment-15542709 ] 

Sean Owen commented on SPARK-17766:
-----------------------------------

Where is the corruption here though.. this is just an exception you get while killing the stream. The bad thing is that the exception doesn't get actually deserialized by Kryo because it's not registered. You could temporarily turn off kryo registration to see what's really going on.

> Write ahead log corruption on a toy project
> -------------------------------------------
>
>                 Key: SPARK-17766
>                 URL: https://issues.apache.org/jira/browse/SPARK-17766
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 2.0.0
>            Reporter: Nadav Samet
>
> Write ahead log seems to get corrupted when the application is stopped abruptly (Ctrl-C, or kill). Then, the application refuses to run due to this exception:
> {code}
> 2016-10-03 08:03:32,321 ERROR [Executor task launch worker-1] executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1)
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
> ...skipping...
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>         at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>         at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>         at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
>         at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> Code:
> {code}
> import org.apache.hadoop.conf.Configuration
> import org.apache.spark._
> import org.apache.spark.streaming._
> object ProtoDemo {
>   def createContext(dirName: String) = {
>     val conf = new SparkConf().setAppName("mything").setMaster("local[4]")
>     conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>     /*
>     conf.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
>     conf.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
>     */
>     val ssc = new StreamingContext(conf, Seconds(1))
>     ssc.checkpoint(dirName)
>     val lines = ssc.socketTextStream("127.0.0.1", 9999)
>     val words = lines.flatMap(_.split(" "))
>     val pairs = words.map(word => (word, 1))
>     val wordCounts = pairs.reduceByKey(_ + _)
>     val runningCounts = wordCounts.updateStateByKey[Int] {
>       (values: Seq[Int], oldValue: Option[Int]) =>
>         val s = values.sum
>         Some(oldValue.fold(s)(_ + s))
>       }
>   // Print the first ten elements of each RDD generated in this DStream to the console
>     runningCounts.print()
>     ssc
>   }
>   def main(args: Array[String]) = {
>     val hadoopConf = new Configuration()
>     val dirName = "/tmp/chkp"
>     val ssc = StreamingContext.getOrCreate(dirName, () => createContext(dirName), hadoopConf)
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
> {code}
> Steps to reproduce:
> 1. I put the code in a repository: git clone https://github.com/thesamet/spark-issue
> 2. in one terminal: {{ while true; do nc -l localhost 9999; done}}
> 3. Start a new terminal
> 4. Run "sbt run".
> 5. Type a few lines in the netcat terminal.
> 6. Kill the streaming project (Ctrl-C), 
> 7. Go back to step 4 until you see the exception above.
> I tried the above with local filesystem and also with S3, and getting the same result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org