You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Surendranauth Hiraman <su...@velos.io> on 2014/06/17 19:40:06 UTC

Re: Java IO Stream Corrupted - Invalid Type AC?

Matt/Ryan,

Did you make any headway on this? My team is running into this also.
Doesn't happen on smaller datasets. Our input set is about 10 GB but we
generate 100s of GBs in the flow itself.

-Suren




On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton <co...@gmail.com> wrote:

> Just ran into this today myself. I'm on branch-1.0 using a CDH3
> cluster (no modifications to Spark or its dependencies). The error
> appeared trying to run GraphX's .connectedComponents() on a ~200GB
> edge list (GraphX worked beautifully on smaller data).
>
> Here's the stacktrace (it's quite similar to yours
> https://imgur.com/7iBA4nJ ).
>
> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
> 4 times; aborting job
> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
> VertexRDD.scala:100
> Exception in thread "main" org.apache.spark.SparkException: Job
> aborted due to stage failure: Task 5.599:39 failed 4 times, most
> recent failure: Exception failure in TID 29735 on host node18:
> java.io.StreamCorruptedException: invalid type code: AC
>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
>         org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
>
> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         java.lang.Thread.run(Thread.java:662)
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
>
> On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen <so...@cloudera.com> wrote:
> > On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo <mk...@oculusinfo.com>
> wrote:
> >> Im trying run some spark code on a cluster but I keep running into a
> >> "java.io.StreamCorruptedException: invalid type code: AC" error. My task
> >> involves analyzing ~50GB of data (some operations involve sorting) then
> >> writing them out to a JSON file. Im running the analysis on each of the
> >> data's ~10 columns and have never had a successful run. My program
> seems to
> >> run for a varying amount of time each time (~between 5-30 minutes) but
> it
> >> always terminates with this error.
> >
> > I can tell you that this usually means somewhere something wrote
> > objects to the same OutputStream with multiple ObjectOutputStreams. AC
> > is a header value.
> >
> > I don't obviously see where/how that could happen, but maybe it rings
> > a bell for someone. This could happen if an OutputStream is reused
> > across object serializations but new ObjectOutputStreams are opened,
> > for example.
>



-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v <su...@sociocast.com>elos.io
W: www.velos.io

Re: Java IO Stream Corrupted - Invalid Type AC?

Posted by Surendranauth Hiraman <su...@velos.io>.
Good question. At this point, I'd have to re-run it to know for sure. We've
been trying various different things, so I'd have to reset the flow config
back to that state.

I can say that by removing persist(DISK_ONLY), the flows are running more
stably, probably due to removing disk contention. We won't be able to run
our full production flows without some type of disk persistence but for
testing, this is how we are continuing to try for now.

I can try tomorrow if you'd like.

-Suren



On Wed, Jun 18, 2014 at 8:35 PM, Patrick Wendell <pw...@gmail.com> wrote:

> Just wondering, do you get this particular exception if you are not
> consolidating shuffle data?
>
> On Wed, Jun 18, 2014 at 12:15 PM, Mridul Muralidharan <mr...@gmail.com>
> wrote:
> > On Wed, Jun 18, 2014 at 6:19 PM, Surendranauth Hiraman
> > <su...@velos.io> wrote:
> >> Patrick,
> >>
> >> My team is using shuffle consolidation but not speculation. We are also
> >> using persist(DISK_ONLY) for caching.
> >
> >
> > Use of shuffle consolidation is probably what is causing the issue.
> > Would be good idea to try again with that turned off (which is the
> default).
> >
> > It should get fixed most likely in 1.1 timeframe.
> >
> >
> > Regards,
> > Mridul
> >
> >
> >>
> >> Here are some config changes that are in our work-in-progress.
> >>
> >> We've been trying for 2 weeks to get our production flow (maybe around
> >> 50-70 stages, a few forks and joins with up to 20 branches in the
> forks) to
> >> run end to end without any success, running into other problems besides
> >> this one as well. For example, we have run into situations where saving
> to
> >> HDFS just hangs on a couple of tasks, which are printing out nothing in
> >> their logs and not taking any CPU. For testing, our input data is 10 GB
> >> across 320 input splits and generates maybe around 200-300 GB of
> >> intermediate and final data.
> >>
> >>
> >>         conf.set("spark.executor.memory", "14g")     // TODO make this
> >> configurable
> >>
> >>         // shuffle configs
> >>         conf.set("spark.default.parallelism", "320") // TODO make this
> >> configurable
> >>         conf.set("spark.shuffle.consolidateFiles","true")
> >>
> >>         conf.set("spark.shuffle.file.buffer.kb", "200")
> >>         conf.set("spark.reducer.maxMbInFlight", "96")
> >>
> >>         conf.set("spark.rdd.compress","true"
> >>
> >>         // we ran into a problem with the default timeout of 60 seconds
> >>         // this is also being set in the master's spark-env.sh. Not
> sure if
> >> it needs to be in both places
> >>         conf.set("spark.worker.timeout","180")
> >>
> >>         // akka settings
> >>         conf.set("spark.akka.threads", "300")
> >>         conf.set("spark.akka.timeout", "180")
> >>         conf.set("spark.akka.frameSize", "100")
> >>         conf.set("spark.akka.batchSize", "30")
> >>         conf.set("spark.akka.askTimeout", "30")
> >>
> >>         // block manager
> >>         conf.set("spark.storage.blockManagerTimeoutIntervalMs",
> "180000")
> >>         conf.set("spark.blockManagerHeartBeatMs", "80000")
> >>
> >> -Suren
> >>
> >>
> >>
> >> On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell <pw...@gmail.com>
> wrote:
> >>
> >>> Out of curiosity - are you guys using speculation, shuffle
> >>> consolidation, or any other non-default option? If so that would help
> >>> narrow down what's causing this corruption.
> >>>
> >>> On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
> >>> <su...@velos.io> wrote:
> >>> > Matt/Ryan,
> >>> >
> >>> > Did you make any headway on this? My team is running into this also.
> >>> > Doesn't happen on smaller datasets. Our input set is about 10 GB but
> we
> >>> > generate 100s of GBs in the flow itself.
> >>> >
> >>> > -Suren
> >>> >
> >>> >
> >>> >
> >>> >
> >>> > On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton <compton.ryan@gmail.com
> >
> >>> wrote:
> >>> >
> >>> >> Just ran into this today myself. I'm on branch-1.0 using a CDH3
> >>> >> cluster (no modifications to Spark or its dependencies). The error
> >>> >> appeared trying to run GraphX's .connectedComponents() on a ~200GB
> >>> >> edge list (GraphX worked beautifully on smaller data).
> >>> >>
> >>> >> Here's the stacktrace (it's quite similar to yours
> >>> >> https://imgur.com/7iBA4nJ ).
> >>> >>
> >>> >> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39
> failed
> >>> >> 4 times; aborting job
> >>> >> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce
> at
> >>> >> VertexRDD.scala:100
> >>> >> Exception in thread "main" org.apache.spark.SparkException: Job
> >>> >> aborted due to stage failure: Task 5.599:39 failed 4 times, most
> >>> >> recent failure: Exception failure in TID 29735 on host node18:
> >>> >> java.io.StreamCorruptedException: invalid type code: AC
> >>> >>
> >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
> >>> >>
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
> >>> >>
> >>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >>> >>
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> >>> >>
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>> >>
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
> >>> >>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >>> >>
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>> >>
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> >>> >>
> >>> >>
> >>>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> >>> >>         org.apache.spark.scheduler.Task.run(Task.scala:51)
> >>> >>
> >>> >>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
> >>> >>
> >>> >>
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >>> >>
> >>> >>
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >>> >>         java.lang.Thread.run(Thread.java:662)
> >>> >> Driver stacktrace:
> >>> >> at org.apache.spark.scheduler.DAGScheduler.org
> >>> >>
> >>>
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> >>> >> at
> >>> >>
> >>>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >>> >> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> >>> >> at scala.Option.foreach(Option.scala:236)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
> >>> >> at
> >>> >>
> >>>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
> >>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> >>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> >>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> >>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >>> >> at
> >>> >>
> >>>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> >>> >> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>> >> at
> >>> >>
> >>>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >>> >> at
> >>>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >>> >> at
> >>> >>
> >>>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >>> >> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling
> stage 5
> >>> >>
> >>> >> On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen <so...@cloudera.com>
> wrote:
> >>> >> > On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo <mkielo@oculusinfo.com
> >
> >>> >> wrote:
> >>> >> >> Im trying run some spark code on a cluster but I keep running
> into a
> >>> >> >> "java.io.StreamCorruptedException: invalid type code: AC" error.
> My
> >>> task
> >>> >> >> involves analyzing ~50GB of data (some operations involve
> sorting)
> >>> then
> >>> >> >> writing them out to a JSON file. Im running the analysis on each
> of
> >>> the
> >>> >> >> data's ~10 columns and have never had a successful run. My
> program
> >>> >> seems to
> >>> >> >> run for a varying amount of time each time (~between 5-30
> minutes)
> >>> but
> >>> >> it
> >>> >> >> always terminates with this error.
> >>> >> >
> >>> >> > I can tell you that this usually means somewhere something wrote
> >>> >> > objects to the same OutputStream with multiple
> ObjectOutputStreams. AC
> >>> >> > is a header value.
> >>> >> >
> >>> >> > I don't obviously see where/how that could happen, but maybe it
> rings
> >>> >> > a bell for someone. This could happen if an OutputStream is reused
> >>> >> > across object serializations but new ObjectOutputStreams are
> opened,
> >>> >> > for example.
> >>> >>
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> >
> >>> > SUREN HIRAMAN, VP TECHNOLOGY
> >>> > Velos
> >>> > Accelerating Machine Learning
> >>> >
> >>> > 440 NINTH AVENUE, 11TH FLOOR
> >>> > NEW YORK, NY 10001
> >>> > O: (917) 525-2466 ext. 105
> >>> > F: 646.349.4063
> >>> > E: suren.hiraman@v <su...@sociocast.com>elos.io
> >>> > W: www.velos.io
> >>>
> >>
> >>
> >>
> >> --
> >>
> >> SUREN HIRAMAN, VP TECHNOLOGY
> >> Velos
> >> Accelerating Machine Learning
> >>
> >> 440 NINTH AVENUE, 11TH FLOOR
> >> NEW YORK, NY 10001
> >> O: (917) 525-2466 ext. 105
> >> F: 646.349.4063
> >> E: suren.hiraman@v <su...@sociocast.com>elos.io
> >> W: www.velos.io
>



-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v <su...@sociocast.com>elos.io
W: www.velos.io

Re: Java IO Stream Corrupted - Invalid Type AC?

Posted by Patrick Wendell <pw...@gmail.com>.
Just wondering, do you get this particular exception if you are not
consolidating shuffle data?

On Wed, Jun 18, 2014 at 12:15 PM, Mridul Muralidharan <mr...@gmail.com> wrote:
> On Wed, Jun 18, 2014 at 6:19 PM, Surendranauth Hiraman
> <su...@velos.io> wrote:
>> Patrick,
>>
>> My team is using shuffle consolidation but not speculation. We are also
>> using persist(DISK_ONLY) for caching.
>
>
> Use of shuffle consolidation is probably what is causing the issue.
> Would be good idea to try again with that turned off (which is the default).
>
> It should get fixed most likely in 1.1 timeframe.
>
>
> Regards,
> Mridul
>
>
>>
>> Here are some config changes that are in our work-in-progress.
>>
>> We've been trying for 2 weeks to get our production flow (maybe around
>> 50-70 stages, a few forks and joins with up to 20 branches in the forks) to
>> run end to end without any success, running into other problems besides
>> this one as well. For example, we have run into situations where saving to
>> HDFS just hangs on a couple of tasks, which are printing out nothing in
>> their logs and not taking any CPU. For testing, our input data is 10 GB
>> across 320 input splits and generates maybe around 200-300 GB of
>> intermediate and final data.
>>
>>
>>         conf.set("spark.executor.memory", "14g")     // TODO make this
>> configurable
>>
>>         // shuffle configs
>>         conf.set("spark.default.parallelism", "320") // TODO make this
>> configurable
>>         conf.set("spark.shuffle.consolidateFiles","true")
>>
>>         conf.set("spark.shuffle.file.buffer.kb", "200")
>>         conf.set("spark.reducer.maxMbInFlight", "96")
>>
>>         conf.set("spark.rdd.compress","true"
>>
>>         // we ran into a problem with the default timeout of 60 seconds
>>         // this is also being set in the master's spark-env.sh. Not sure if
>> it needs to be in both places
>>         conf.set("spark.worker.timeout","180")
>>
>>         // akka settings
>>         conf.set("spark.akka.threads", "300")
>>         conf.set("spark.akka.timeout", "180")
>>         conf.set("spark.akka.frameSize", "100")
>>         conf.set("spark.akka.batchSize", "30")
>>         conf.set("spark.akka.askTimeout", "30")
>>
>>         // block manager
>>         conf.set("spark.storage.blockManagerTimeoutIntervalMs", "180000")
>>         conf.set("spark.blockManagerHeartBeatMs", "80000")
>>
>> -Suren
>>
>>
>>
>> On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell <pw...@gmail.com> wrote:
>>
>>> Out of curiosity - are you guys using speculation, shuffle
>>> consolidation, or any other non-default option? If so that would help
>>> narrow down what's causing this corruption.
>>>
>>> On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
>>> <su...@velos.io> wrote:
>>> > Matt/Ryan,
>>> >
>>> > Did you make any headway on this? My team is running into this also.
>>> > Doesn't happen on smaller datasets. Our input set is about 10 GB but we
>>> > generate 100s of GBs in the flow itself.
>>> >
>>> > -Suren
>>> >
>>> >
>>> >
>>> >
>>> > On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton <co...@gmail.com>
>>> wrote:
>>> >
>>> >> Just ran into this today myself. I'm on branch-1.0 using a CDH3
>>> >> cluster (no modifications to Spark or its dependencies). The error
>>> >> appeared trying to run GraphX's .connectedComponents() on a ~200GB
>>> >> edge list (GraphX worked beautifully on smaller data).
>>> >>
>>> >> Here's the stacktrace (it's quite similar to yours
>>> >> https://imgur.com/7iBA4nJ ).
>>> >>
>>> >> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
>>> >> 4 times; aborting job
>>> >> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
>>> >> VertexRDD.scala:100
>>> >> Exception in thread "main" org.apache.spark.SparkException: Job
>>> >> aborted due to stage failure: Task 5.599:39 failed 4 times, most
>>> >> recent failure: Exception failure in TID 29735 on host node18:
>>> >> java.io.StreamCorruptedException: invalid type code: AC
>>> >>
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
>>> >>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>>> >>
>>> >>
>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>>> >>
>>> >>
>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
>>> >>
>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> >>
>>> >>
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>>> >>
>>> >>
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> >>
>>> >>
>>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
>>> >>
>>> >>
>>> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
>>> >>
>>> >>
>>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
>>> >>
>>> >>
>>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
>>> >>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> >>
>>> >>
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>>> >>
>>> >>
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>>> >>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>>> >>
>>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>>> >>
>>> >>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>> >>
>>> >>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>> >>         java.lang.Thread.run(Thread.java:662)
>>> >> Driver stacktrace:
>>> >> at org.apache.spark.scheduler.DAGScheduler.org
>>> >>
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>>> >> at
>>> >>
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>>> >> at scala.Option.foreach(Option.scala:236)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>>> >> at
>>> >>
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>> >> at
>>> >>
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> >> at
>>> >>
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> >> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> >> at
>>> >>
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> >> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
>>> >>
>>> >> On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen <so...@cloudera.com> wrote:
>>> >> > On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo <mk...@oculusinfo.com>
>>> >> wrote:
>>> >> >> Im trying run some spark code on a cluster but I keep running into a
>>> >> >> "java.io.StreamCorruptedException: invalid type code: AC" error. My
>>> task
>>> >> >> involves analyzing ~50GB of data (some operations involve sorting)
>>> then
>>> >> >> writing them out to a JSON file. Im running the analysis on each of
>>> the
>>> >> >> data's ~10 columns and have never had a successful run. My program
>>> >> seems to
>>> >> >> run for a varying amount of time each time (~between 5-30 minutes)
>>> but
>>> >> it
>>> >> >> always terminates with this error.
>>> >> >
>>> >> > I can tell you that this usually means somewhere something wrote
>>> >> > objects to the same OutputStream with multiple ObjectOutputStreams. AC
>>> >> > is a header value.
>>> >> >
>>> >> > I don't obviously see where/how that could happen, but maybe it rings
>>> >> > a bell for someone. This could happen if an OutputStream is reused
>>> >> > across object serializations but new ObjectOutputStreams are opened,
>>> >> > for example.
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> >
>>> > SUREN HIRAMAN, VP TECHNOLOGY
>>> > Velos
>>> > Accelerating Machine Learning
>>> >
>>> > 440 NINTH AVENUE, 11TH FLOOR
>>> > NEW YORK, NY 10001
>>> > O: (917) 525-2466 ext. 105
>>> > F: 646.349.4063
>>> > E: suren.hiraman@v <su...@sociocast.com>elos.io
>>> > W: www.velos.io
>>>
>>
>>
>>
>> --
>>
>> SUREN HIRAMAN, VP TECHNOLOGY
>> Velos
>> Accelerating Machine Learning
>>
>> 440 NINTH AVENUE, 11TH FLOOR
>> NEW YORK, NY 10001
>> O: (917) 525-2466 ext. 105
>> F: 646.349.4063
>> E: suren.hiraman@v <su...@sociocast.com>elos.io
>> W: www.velos.io

Re: Java IO Stream Corrupted - Invalid Type AC?

Posted by Mridul Muralidharan <mr...@gmail.com>.
On Wed, Jun 18, 2014 at 6:19 PM, Surendranauth Hiraman
<su...@velos.io> wrote:
> Patrick,
>
> My team is using shuffle consolidation but not speculation. We are also
> using persist(DISK_ONLY) for caching.


Use of shuffle consolidation is probably what is causing the issue.
Would be good idea to try again with that turned off (which is the default).

It should get fixed most likely in 1.1 timeframe.


Regards,
Mridul


>
> Here are some config changes that are in our work-in-progress.
>
> We've been trying for 2 weeks to get our production flow (maybe around
> 50-70 stages, a few forks and joins with up to 20 branches in the forks) to
> run end to end without any success, running into other problems besides
> this one as well. For example, we have run into situations where saving to
> HDFS just hangs on a couple of tasks, which are printing out nothing in
> their logs and not taking any CPU. For testing, our input data is 10 GB
> across 320 input splits and generates maybe around 200-300 GB of
> intermediate and final data.
>
>
>         conf.set("spark.executor.memory", "14g")     // TODO make this
> configurable
>
>         // shuffle configs
>         conf.set("spark.default.parallelism", "320") // TODO make this
> configurable
>         conf.set("spark.shuffle.consolidateFiles","true")
>
>         conf.set("spark.shuffle.file.buffer.kb", "200")
>         conf.set("spark.reducer.maxMbInFlight", "96")
>
>         conf.set("spark.rdd.compress","true"
>
>         // we ran into a problem with the default timeout of 60 seconds
>         // this is also being set in the master's spark-env.sh. Not sure if
> it needs to be in both places
>         conf.set("spark.worker.timeout","180")
>
>         // akka settings
>         conf.set("spark.akka.threads", "300")
>         conf.set("spark.akka.timeout", "180")
>         conf.set("spark.akka.frameSize", "100")
>         conf.set("spark.akka.batchSize", "30")
>         conf.set("spark.akka.askTimeout", "30")
>
>         // block manager
>         conf.set("spark.storage.blockManagerTimeoutIntervalMs", "180000")
>         conf.set("spark.blockManagerHeartBeatMs", "80000")
>
> -Suren
>
>
>
> On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell <pw...@gmail.com> wrote:
>
>> Out of curiosity - are you guys using speculation, shuffle
>> consolidation, or any other non-default option? If so that would help
>> narrow down what's causing this corruption.
>>
>> On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
>> <su...@velos.io> wrote:
>> > Matt/Ryan,
>> >
>> > Did you make any headway on this? My team is running into this also.
>> > Doesn't happen on smaller datasets. Our input set is about 10 GB but we
>> > generate 100s of GBs in the flow itself.
>> >
>> > -Suren
>> >
>> >
>> >
>> >
>> > On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton <co...@gmail.com>
>> wrote:
>> >
>> >> Just ran into this today myself. I'm on branch-1.0 using a CDH3
>> >> cluster (no modifications to Spark or its dependencies). The error
>> >> appeared trying to run GraphX's .connectedComponents() on a ~200GB
>> >> edge list (GraphX worked beautifully on smaller data).
>> >>
>> >> Here's the stacktrace (it's quite similar to yours
>> >> https://imgur.com/7iBA4nJ ).
>> >>
>> >> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
>> >> 4 times; aborting job
>> >> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
>> >> VertexRDD.scala:100
>> >> Exception in thread "main" org.apache.spark.SparkException: Job
>> >> aborted due to stage failure: Task 5.599:39 failed 4 times, most
>> >> recent failure: Exception failure in TID 29735 on host node18:
>> >> java.io.StreamCorruptedException: invalid type code: AC
>> >>
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
>> >>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>> >>
>> >>
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>> >>
>> >>
>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
>> >>
>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> >>
>> >>
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>> >>
>> >>
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >>
>> >>
>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
>> >>
>> >>
>> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
>> >>
>> >>
>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
>> >>
>> >>
>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
>> >>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >>
>> >>
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>> >>
>> >>
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>> >>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>> >>
>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>> >>
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >>
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >>         java.lang.Thread.run(Thread.java:662)
>> >> Driver stacktrace:
>> >> at org.apache.spark.scheduler.DAGScheduler.org
>> >>
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>> >> at
>> >>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>> >> at scala.Option.foreach(Option.scala:236)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>> >> at
>> >>
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> >> at
>> >>
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >> at
>> >>
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >> at
>> >>
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
>> >>
>> >> On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen <so...@cloudera.com> wrote:
>> >> > On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo <mk...@oculusinfo.com>
>> >> wrote:
>> >> >> Im trying run some spark code on a cluster but I keep running into a
>> >> >> "java.io.StreamCorruptedException: invalid type code: AC" error. My
>> task
>> >> >> involves analyzing ~50GB of data (some operations involve sorting)
>> then
>> >> >> writing them out to a JSON file. Im running the analysis on each of
>> the
>> >> >> data's ~10 columns and have never had a successful run. My program
>> >> seems to
>> >> >> run for a varying amount of time each time (~between 5-30 minutes)
>> but
>> >> it
>> >> >> always terminates with this error.
>> >> >
>> >> > I can tell you that this usually means somewhere something wrote
>> >> > objects to the same OutputStream with multiple ObjectOutputStreams. AC
>> >> > is a header value.
>> >> >
>> >> > I don't obviously see where/how that could happen, but maybe it rings
>> >> > a bell for someone. This could happen if an OutputStream is reused
>> >> > across object serializations but new ObjectOutputStreams are opened,
>> >> > for example.
>> >>
>> >
>> >
>> >
>> > --
>> >
>> > SUREN HIRAMAN, VP TECHNOLOGY
>> > Velos
>> > Accelerating Machine Learning
>> >
>> > 440 NINTH AVENUE, 11TH FLOOR
>> > NEW YORK, NY 10001
>> > O: (917) 525-2466 ext. 105
>> > F: 646.349.4063
>> > E: suren.hiraman@v <su...@sociocast.com>elos.io
>> > W: www.velos.io
>>
>
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v <su...@sociocast.com>elos.io
> W: www.velos.io

Re: Java IO Stream Corrupted - Invalid Type AC?

Posted by Surendranauth Hiraman <su...@velos.io>.
Patrick,

My team is using shuffle consolidation but not speculation. We are also
using persist(DISK_ONLY) for caching.

Here are some config changes that are in our work-in-progress.

We've been trying for 2 weeks to get our production flow (maybe around
50-70 stages, a few forks and joins with up to 20 branches in the forks) to
run end to end without any success, running into other problems besides
this one as well. For example, we have run into situations where saving to
HDFS just hangs on a couple of tasks, which are printing out nothing in
their logs and not taking any CPU. For testing, our input data is 10 GB
across 320 input splits and generates maybe around 200-300 GB of
intermediate and final data.


        conf.set("spark.executor.memory", "14g")     // TODO make this
configurable

        // shuffle configs
        conf.set("spark.default.parallelism", "320") // TODO make this
configurable
        conf.set("spark.shuffle.consolidateFiles","true")

        conf.set("spark.shuffle.file.buffer.kb", "200")
        conf.set("spark.reducer.maxMbInFlight", "96")

        conf.set("spark.rdd.compress","true"

        // we ran into a problem with the default timeout of 60 seconds
        // this is also being set in the master's spark-env.sh. Not sure if
it needs to be in both places
        conf.set("spark.worker.timeout","180")

        // akka settings
        conf.set("spark.akka.threads", "300")
        conf.set("spark.akka.timeout", "180")
        conf.set("spark.akka.frameSize", "100")
        conf.set("spark.akka.batchSize", "30")
        conf.set("spark.akka.askTimeout", "30")

        // block manager
        conf.set("spark.storage.blockManagerTimeoutIntervalMs", "180000")
        conf.set("spark.blockManagerHeartBeatMs", "80000")

-Suren



On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell <pw...@gmail.com> wrote:

> Out of curiosity - are you guys using speculation, shuffle
> consolidation, or any other non-default option? If so that would help
> narrow down what's causing this corruption.
>
> On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
> <su...@velos.io> wrote:
> > Matt/Ryan,
> >
> > Did you make any headway on this? My team is running into this also.
> > Doesn't happen on smaller datasets. Our input set is about 10 GB but we
> > generate 100s of GBs in the flow itself.
> >
> > -Suren
> >
> >
> >
> >
> > On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton <co...@gmail.com>
> wrote:
> >
> >> Just ran into this today myself. I'm on branch-1.0 using a CDH3
> >> cluster (no modifications to Spark or its dependencies). The error
> >> appeared trying to run GraphX's .connectedComponents() on a ~200GB
> >> edge list (GraphX worked beautifully on smaller data).
> >>
> >> Here's the stacktrace (it's quite similar to yours
> >> https://imgur.com/7iBA4nJ ).
> >>
> >> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
> >> 4 times; aborting job
> >> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
> >> VertexRDD.scala:100
> >> Exception in thread "main" org.apache.spark.SparkException: Job
> >> aborted due to stage failure: Task 5.599:39 failed 4 times, most
> >> recent failure: Exception failure in TID 29735 on host node18:
> >> java.io.StreamCorruptedException: invalid type code: AC
> >>
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
> >>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
> >>
> >>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> >>
> >>
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
> >>
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>
> >>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
> >>
> >>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>
> >>
> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
> >>
> >>
> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
> >>
> >>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
> >>
> >>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
> >>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> >>
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> >>         org.apache.spark.scheduler.Task.run(Task.scala:51)
> >>
> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >>         java.lang.Thread.run(Thread.java:662)
> >> Driver stacktrace:
> >> at org.apache.spark.scheduler.DAGScheduler.org
> >>
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> >> at
> >>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> >> at scala.Option.foreach(Option.scala:236)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
> >> at
> >>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >> at
> >>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> at
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> at
> >>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
> >>
> >> On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen <so...@cloudera.com> wrote:
> >> > On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo <mk...@oculusinfo.com>
> >> wrote:
> >> >> Im trying run some spark code on a cluster but I keep running into a
> >> >> "java.io.StreamCorruptedException: invalid type code: AC" error. My
> task
> >> >> involves analyzing ~50GB of data (some operations involve sorting)
> then
> >> >> writing them out to a JSON file. Im running the analysis on each of
> the
> >> >> data's ~10 columns and have never had a successful run. My program
> >> seems to
> >> >> run for a varying amount of time each time (~between 5-30 minutes)
> but
> >> it
> >> >> always terminates with this error.
> >> >
> >> > I can tell you that this usually means somewhere something wrote
> >> > objects to the same OutputStream with multiple ObjectOutputStreams. AC
> >> > is a header value.
> >> >
> >> > I don't obviously see where/how that could happen, but maybe it rings
> >> > a bell for someone. This could happen if an OutputStream is reused
> >> > across object serializations but new ObjectOutputStreams are opened,
> >> > for example.
> >>
> >
> >
> >
> > --
> >
> > SUREN HIRAMAN, VP TECHNOLOGY
> > Velos
> > Accelerating Machine Learning
> >
> > 440 NINTH AVENUE, 11TH FLOOR
> > NEW YORK, NY 10001
> > O: (917) 525-2466 ext. 105
> > F: 646.349.4063
> > E: suren.hiraman@v <su...@sociocast.com>elos.io
> > W: www.velos.io
>



-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v <su...@sociocast.com>elos.io
W: www.velos.io

Re: Java IO Stream Corrupted - Invalid Type AC?

Posted by Surendranauth Hiraman <su...@velos.io>.
Patrick,

My team is using shuffle consolidation but not speculation. We are also
using persist(DISK_ONLY) for caching.

Here are some config changes that are in our work-in-progress.

We've been trying for 2 weeks to get our production flow (maybe around
50-70 stages, a few forks and joins with up to 20 branches in the forks) to
run end to end without any success, running into other problems besides
this one as well. For example, we have run into situations where saving to
HDFS just hangs on a couple of tasks, which are printing out nothing in
their logs and not taking any CPU. For testing, our input data is 10 GB
across 320 input splits and generates maybe around 200-300 GB of
intermediate and final data.


        conf.set("spark.executor.memory", "14g")     // TODO make this
configurable

        // shuffle configs
        conf.set("spark.default.parallelism", "320") // TODO make this
configurable
        conf.set("spark.shuffle.consolidateFiles","true")

        conf.set("spark.shuffle.file.buffer.kb", "200")
        conf.set("spark.reducer.maxMbInFlight", "96")

        conf.set("spark.rdd.compress","true"

        // we ran into a problem with the default timeout of 60 seconds
        // this is also being set in the master's spark-env.sh. Not sure if
it needs to be in both places
        conf.set("spark.worker.timeout","180")

        // akka settings
        conf.set("spark.akka.threads", "300")
        conf.set("spark.akka.timeout", "180")
        conf.set("spark.akka.frameSize", "100")
        conf.set("spark.akka.batchSize", "30")
        conf.set("spark.akka.askTimeout", "30")

        // block manager
        conf.set("spark.storage.blockManagerTimeoutIntervalMs", "180000")
        conf.set("spark.blockManagerHeartBeatMs", "80000")

-Suren



On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell <pw...@gmail.com> wrote:

> Out of curiosity - are you guys using speculation, shuffle
> consolidation, or any other non-default option? If so that would help
> narrow down what's causing this corruption.
>
> On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
> <su...@velos.io> wrote:
> > Matt/Ryan,
> >
> > Did you make any headway on this? My team is running into this also.
> > Doesn't happen on smaller datasets. Our input set is about 10 GB but we
> > generate 100s of GBs in the flow itself.
> >
> > -Suren
> >
> >
> >
> >
> > On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton <co...@gmail.com>
> wrote:
> >
> >> Just ran into this today myself. I'm on branch-1.0 using a CDH3
> >> cluster (no modifications to Spark or its dependencies). The error
> >> appeared trying to run GraphX's .connectedComponents() on a ~200GB
> >> edge list (GraphX worked beautifully on smaller data).
> >>
> >> Here's the stacktrace (it's quite similar to yours
> >> https://imgur.com/7iBA4nJ ).
> >>
> >> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
> >> 4 times; aborting job
> >> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
> >> VertexRDD.scala:100
> >> Exception in thread "main" org.apache.spark.SparkException: Job
> >> aborted due to stage failure: Task 5.599:39 failed 4 times, most
> >> recent failure: Exception failure in TID 29735 on host node18:
> >> java.io.StreamCorruptedException: invalid type code: AC
> >>
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
> >>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
> >>
> >>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> >>
> >>
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
> >>
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>
> >>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
> >>
> >>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>
> >>
> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
> >>
> >>
> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
> >>
> >>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
> >>
> >>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
> >>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> >>
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> >>         org.apache.spark.scheduler.Task.run(Task.scala:51)
> >>
> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >>         java.lang.Thread.run(Thread.java:662)
> >> Driver stacktrace:
> >> at org.apache.spark.scheduler.DAGScheduler.org
> >>
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
> >> at
> >>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
> >> at scala.Option.foreach(Option.scala:236)
> >> at
> >>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
> >> at
> >>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >> at
> >>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> at
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> at
> >>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
> >>
> >> On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen <so...@cloudera.com> wrote:
> >> > On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo <mk...@oculusinfo.com>
> >> wrote:
> >> >> Im trying run some spark code on a cluster but I keep running into a
> >> >> "java.io.StreamCorruptedException: invalid type code: AC" error. My
> task
> >> >> involves analyzing ~50GB of data (some operations involve sorting)
> then
> >> >> writing them out to a JSON file. Im running the analysis on each of
> the
> >> >> data's ~10 columns and have never had a successful run. My program
> >> seems to
> >> >> run for a varying amount of time each time (~between 5-30 minutes)
> but
> >> it
> >> >> always terminates with this error.
> >> >
> >> > I can tell you that this usually means somewhere something wrote
> >> > objects to the same OutputStream with multiple ObjectOutputStreams. AC
> >> > is a header value.
> >> >
> >> > I don't obviously see where/how that could happen, but maybe it rings
> >> > a bell for someone. This could happen if an OutputStream is reused
> >> > across object serializations but new ObjectOutputStreams are opened,
> >> > for example.
> >>
> >
> >
> >
> > --
> >
> > SUREN HIRAMAN, VP TECHNOLOGY
> > Velos
> > Accelerating Machine Learning
> >
> > 440 NINTH AVENUE, 11TH FLOOR
> > NEW YORK, NY 10001
> > O: (917) 525-2466 ext. 105
> > F: 646.349.4063
> > E: suren.hiraman@v <su...@sociocast.com>elos.io
> > W: www.velos.io
>



-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v <su...@sociocast.com>elos.io
W: www.velos.io

Re: Java IO Stream Corrupted - Invalid Type AC?

Posted by Patrick Wendell <pw...@gmail.com>.
Out of curiosity - are you guys using speculation, shuffle
consolidation, or any other non-default option? If so that would help
narrow down what's causing this corruption.

On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
<su...@velos.io> wrote:
> Matt/Ryan,
>
> Did you make any headway on this? My team is running into this also.
> Doesn't happen on smaller datasets. Our input set is about 10 GB but we
> generate 100s of GBs in the flow itself.
>
> -Suren
>
>
>
>
> On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton <co...@gmail.com> wrote:
>
>> Just ran into this today myself. I'm on branch-1.0 using a CDH3
>> cluster (no modifications to Spark or its dependencies). The error
>> appeared trying to run GraphX's .connectedComponents() on a ~200GB
>> edge list (GraphX worked beautifully on smaller data).
>>
>> Here's the stacktrace (it's quite similar to yours
>> https://imgur.com/7iBA4nJ ).
>>
>> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
>> 4 times; aborting job
>> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
>> VertexRDD.scala:100
>> Exception in thread "main" org.apache.spark.SparkException: Job
>> aborted due to stage failure: Task 5.599:39 failed 4 times, most
>> recent failure: Exception failure in TID 29735 on host node18:
>> java.io.StreamCorruptedException: invalid type code: AC
>>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
>>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>>
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>>
>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
>>         org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>>
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>
>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
>>
>> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
>>
>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
>>
>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
>>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>>
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>>
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>         java.lang.Thread.run(Thread.java:662)
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
>>
>> On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen <so...@cloudera.com> wrote:
>> > On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo <mk...@oculusinfo.com>
>> wrote:
>> >> Im trying run some spark code on a cluster but I keep running into a
>> >> "java.io.StreamCorruptedException: invalid type code: AC" error. My task
>> >> involves analyzing ~50GB of data (some operations involve sorting) then
>> >> writing them out to a JSON file. Im running the analysis on each of the
>> >> data's ~10 columns and have never had a successful run. My program
>> seems to
>> >> run for a varying amount of time each time (~between 5-30 minutes) but
>> it
>> >> always terminates with this error.
>> >
>> > I can tell you that this usually means somewhere something wrote
>> > objects to the same OutputStream with multiple ObjectOutputStreams. AC
>> > is a header value.
>> >
>> > I don't obviously see where/how that could happen, but maybe it rings
>> > a bell for someone. This could happen if an OutputStream is reused
>> > across object serializations but new ObjectOutputStreams are opened,
>> > for example.
>>
>
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v <su...@sociocast.com>elos.io
> W: www.velos.io

Re: Java IO Stream Corrupted - Invalid Type AC?

Posted by Patrick Wendell <pw...@gmail.com>.
Out of curiosity - are you guys using speculation, shuffle
consolidation, or any other non-default option? If so that would help
narrow down what's causing this corruption.

On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
<su...@velos.io> wrote:
> Matt/Ryan,
>
> Did you make any headway on this? My team is running into this also.
> Doesn't happen on smaller datasets. Our input set is about 10 GB but we
> generate 100s of GBs in the flow itself.
>
> -Suren
>
>
>
>
> On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton <co...@gmail.com> wrote:
>
>> Just ran into this today myself. I'm on branch-1.0 using a CDH3
>> cluster (no modifications to Spark or its dependencies). The error
>> appeared trying to run GraphX's .connectedComponents() on a ~200GB
>> edge list (GraphX worked beautifully on smaller data).
>>
>> Here's the stacktrace (it's quite similar to yours
>> https://imgur.com/7iBA4nJ ).
>>
>> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
>> 4 times; aborting job
>> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
>> VertexRDD.scala:100
>> Exception in thread "main" org.apache.spark.SparkException: Job
>> aborted due to stage failure: Task 5.599:39 failed 4 times, most
>> recent failure: Exception failure in TID 29735 on host node18:
>> java.io.StreamCorruptedException: invalid type code: AC
>>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
>>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>>
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
>>
>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
>>         org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
>>
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>
>> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
>>
>> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
>>
>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
>>
>> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
>>         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>         scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>>
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>>         org.apache.spark.scheduler.Task.run(Task.scala:51)
>>
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>         java.lang.Thread.run(Thread.java:662)
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 14/06/05 20:02:28 INFO scheduler.TaskSchedulerImpl: Cancelling stage 5
>>
>> On Wed, Jun 4, 2014 at 7:50 AM, Sean Owen <so...@cloudera.com> wrote:
>> > On Wed, Jun 4, 2014 at 3:33 PM, Matt Kielo <mk...@oculusinfo.com>
>> wrote:
>> >> Im trying run some spark code on a cluster but I keep running into a
>> >> "java.io.StreamCorruptedException: invalid type code: AC" error. My task
>> >> involves analyzing ~50GB of data (some operations involve sorting) then
>> >> writing them out to a JSON file. Im running the analysis on each of the
>> >> data's ~10 columns and have never had a successful run. My program
>> seems to
>> >> run for a varying amount of time each time (~between 5-30 minutes) but
>> it
>> >> always terminates with this error.
>> >
>> > I can tell you that this usually means somewhere something wrote
>> > objects to the same OutputStream with multiple ObjectOutputStreams. AC
>> > is a header value.
>> >
>> > I don't obviously see where/how that could happen, but maybe it rings
>> > a bell for someone. This could happen if an OutputStream is reused
>> > across object serializations but new ObjectOutputStreams are opened,
>> > for example.
>>
>
>
>
> --
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v <su...@sociocast.com>elos.io
> W: www.velos.io