You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ryan Compton <co...@gmail.com> on 2014/01/18 00:02:48 UTC

FileNotFoundException on distinct()?

When I try .distinct() my jobs fail. Possibly related:
https://groups.google.com/forum/#!topic/shark-users/j2TO-GINuFo

This works

    //get the node ids
    val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
    //count the nodes
    val numNodes = nodes.count()
    logWarning("numNodes:\t"+numNodes)

this fails

    //get the node ids
    val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
    //count the nodes
    val numNodes = nodes.distinct().count()
    logWarning("numNodes:\t"+numNodes)

with these stacktraces:

14/01/17 14:54:37 WARN scripts.ComputeNetworkStats: numEdges: 915189977
14/01/17 14:54:37 INFO rdd.MappedRDD: Removing RDD 1 from persistence list
--
14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
java.io.IOException
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:299)
at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:77)
at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.read(DFSClient.java:2317)
at java.io.DataInputStream.read(DataInputStream.java:83)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:205)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:169)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
2.0:1419 as TID 1396 on executor 6: node25 (PROCESS_LOCAL)
--
14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
java.lang.IllegalStateException
java.lang.IllegalStateException: Shutdown in progress
at java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:39)
at java.lang.Runtime.addShutdownHook(Runtime.java:192)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:1655)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1627)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:183)
at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:92)
at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:54)
at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:93)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:83)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:29)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
2.0:1419 as TID 1403 on executor 6: node25 (PROCESS_LOCAL)
--
14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
java.io.FileNotFoundException
java.io.FileNotFoundException:
/tmp/spark-local-20140117145333-5d01/27/shuffle_0_54_1140 (Too many
open files)
at java.io.FileOutputStream.openAppend(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
2.0:54 as TID 1407 on executor 6: node25 (NODE_LOCAL)
--
14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
java.io.FileNotFoundException
java.io.FileNotFoundException:
/tmp/spark-local-20140117145333-5d01/0b/shuffle_0_441_482 (Too many
open files)
at java.io.FileOutputStream.openAppend(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
2.0:441 as TID 1415 on executor 6: node25 (PROCESS_LOCAL)
--
14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
java.io.FileNotFoundException
java.io.FileNotFoundException:
/tmp/spark-local-20140117145333-5d01/0b/shuffle_0_238_365 (Too many
open files)
at java.io.FileOutputStream.openAppend(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
2.0:238 as TID 1423 on executor 6: node25 (NODE_LOCAL)
--
14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
java.io.FileNotFoundException
java.io.FileNotFoundException:
/tmp/spark-local-20140117145333-5d01/35/shuffle_0_37_144 (Too many
open files)
at java.io.FileOutputStream.openAppend(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
2.0:37 as TID 1424 on executor 6: node25 (NODE_LOCAL)
--
14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
java.lang.IllegalStateException: Shutdown in progress [duplicate 19]
14/01/17 14:56:07 ERROR cluster.ClusterTaskSetManager: Task 2.0:1679
failed more than 4 times; aborting job
14/01/17 14:56:07 INFO cluster.ClusterScheduler: Remove TaskSet 2.0 from pool
--
14/01/17 14:56:07 INFO scheduler.DAGScheduler: Failed to run count at
ComputeNetworkStats.scala:59
Exception in thread "main" org.apache.spark.SparkException: Job
failed: Task 2.0:1679 failed more than 4 times
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
14/01/17 14:56:07 INFO cluster.ClusterScheduler: Ignoring update from
TID 1418 because its task set is gone

Re: FileNotFoundException on distinct()?

Posted by Andrew Or <an...@gmail.com>.
Hi Jiacheng,

What change did you make to your code? In particular, did you directly
create an ExternalAppendOnlyMap, or did you use it through an RDD operation?

The error that you got simply means that your code calls next() on
ExternalAppendOnlyMap's iterator when there are no more elements to be read
(i.e. hasNext is false). This should not happen if you use
ExternalAppendOnlyMap through the standard RDD operations such as
reduceByKey or cogroup.

Andrew


On Mon, Jan 20, 2014 at 4:22 AM, guojc <gu...@gmail.com> wrote:

Hi,
  I'm tring out lastest master branch of spark for the exciting external
hashmap feature. I have a code that is running correctly at spark 0.8.1 and
I only make a change for its easily to be spilled to disk. However, I
encounter a few task failure of
java.util.NoSuchElementException (java.util.NoSuchElementException)
org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:277)
org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.next(ExternalAppendOnlyMap.scala:212)
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29)
And the job seems to fail to recover.
Can anyone give some suggestion on how to investigate the issue?
Thanks,Jiacheng Guo

Re: FileNotFoundException on distinct()?

Posted by Andrew Ash <an...@andrewash.com>.
Also you will need to bounce the spark services from a new ssh session to
make the ulimit changes take effect (if you changed the value in
/etc/limits)

Sent from my mobile phone
On Jan 20, 2014 5:32 PM, "Jey Kottalam" <je...@cs.berkeley.edu> wrote:

> Can you try ulimit -n to make sure the increased limit has taken effect?
>
> On Monday, January 20, 2014, Ryan Compton <co...@gmail.com> wrote:
>
>> I've got
>>
>>     System.setProperty("spark.shuffle.consolidate.files", "true");
>>
>> but I'm getting the same error.
>>
>> The output of the distinct count will be 101,230,940 (I did it in
>> pig). I've got 13 nodes and each node allows 13,069,279 open files. So
>> even with 1 record per file I think I've got enough. But what do the
>> rest of you have for /proc/sys/fs/file-max?
>>
>> On Sun, Jan 19, 2014 at 5:12 PM, Mark Hamstra <ma...@clearstorydata.com>
>> wrote:
>> > You should try setting spark.shuffle.consolidate.files to true.
>> >
>> >
>> > On Sun, Jan 19, 2014 at 4:49 PM, Ryan Compton <co...@gmail.com>
>> > wrote:
>> >>
>> >> I think I've shuffled this data before (I often join on it), and I
>> >> know I was using distinct() in 0.7.3 for the same computation.
>> >>
>> >> What do people usually have in  /proc/sys/fs/file-max? I'm real
>> >> surprised that 13M isn't enough.
>> >>
>> >> On Sat, Jan 18, 2014 at 11:47 PM, Mark Hamstra <
>> mark@clearstorydata.com>
>> >> wrote:
>> >> > distinct() needs to do a shuffle -- which is resulting in the need to
>> >> > materialize the map outputs as files.  count() doesn't.
>> >> >
>> >> >
>> >> > On Sat, Jan 18, 2014 at 10:33 PM, Ryan Compton <
>> compton.ryan@gmail.com>
>> >> > wrote:
>> >> >>
>> >> >> I'm able to open ~13M files. I expect the output of
>> >> >> .distinct().count() to be under 100M, why do I need so many files
>> >> >> open?
>> >> >>
>> >> >> rfcompton@node19 ~> cat /etc/redhat-release
>> >> >> CentOS release 5.7 (Final)
>> >> >> rfcompton@node19 ~> cat /proc/sys/fs/file-max
>> >> >> 13069279
>> >> >>
>> >> >> On Sat, Jan 18, 2014 at 9:12 AM, Jey Kottalam <je...@cs.berkeley.edu>
>> >> >> wrote:
>> >> >> > The "too many open files" error is due to running out of available
>> >> >> > FDs, usually due to a limit set in the OS.
>> >> >> >
>> >> >> > The fix will depend on your specific OS, but under Linux it
>> usually
>> >> >> > involves the "fs.file-max" syctl.
>> >> >> >
>> >> >> > On Fri, Jan 17, 2014 at 3:02 PM, Ryan Compton
>> >> >> > <co...@gmail.com>
>> >> >> > wrote:
>> >> >> >> When I try .distinct() my jobs fail. Possibly related:
>> >> >> >> https://groups.google.com/forum/#!topic/shark-users/j2TO-GINuFo
>> >> >> >>
>> >> >> >> This works
>> >> >> >>
>> >> >> >>     //get the node ids
>> >> >> >>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
>> >> >> >>     //count the nodes
>> >> >> >>     val numNodes = nodes.count()
>> >> >> >>     logWarning("numNodes:\t"+numNodes)
>> >> >> >>
>> >> >> >> this fails
>> >> >> >>
>> >> >> >>     //get the node ids
>> >> >> >>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
>> >> >> >>     //count the nodes
>> >> >> >>     val numNodes = nodes.distinct().count()
>> >> >> >>     logWarning("numNodes:\t"+numNodes)
>> >> >> >>
>> >> >> >> with these stacktraces:
>> >> >> >>
>> >> >> >> 14/01/17 14:54:37 WARN scripts.ComputeNetworkStats: numEdges:
>> >> >> >> 915189977
>> >> >> >> 14/01/17 14:54:37 INFO rdd.MappedRDD: Removing RDD 1 from
>> >> >> >> persistence
>> >> >> >> list
>> >> >> >> --
>> >> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was
>> due
>> >> >> >> to
>> >> >> >> java.io.IOException
>> >> >> >> java.io.IOException: Filesystem closed
>> >> >> >> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:299)
>> >> >> >> at
>> org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:77)
>> >> >> >> at
>>
>

Re: FileNotFoundException on distinct()?

Posted by Jey Kottalam <je...@cs.berkeley.edu>.
Can you try ulimit -n to make sure the increased limit has taken effect?

On Monday, January 20, 2014, Ryan Compton <co...@gmail.com> wrote:

> I've got
>
>     System.setProperty("spark.shuffle.consolidate.files", "true");
>
> but I'm getting the same error.
>
> The output of the distinct count will be 101,230,940 (I did it in
> pig). I've got 13 nodes and each node allows 13,069,279 open files. So
> even with 1 record per file I think I've got enough. But what do the
> rest of you have for /proc/sys/fs/file-max?
>
> On Sun, Jan 19, 2014 at 5:12 PM, Mark Hamstra <ma...@clearstorydata.com>
> wrote:
> > You should try setting spark.shuffle.consolidate.files to true.
> >
> >
> > On Sun, Jan 19, 2014 at 4:49 PM, Ryan Compton <co...@gmail.com>
> > wrote:
> >>
> >> I think I've shuffled this data before (I often join on it), and I
> >> know I was using distinct() in 0.7.3 for the same computation.
> >>
> >> What do people usually have in  /proc/sys/fs/file-max? I'm real
> >> surprised that 13M isn't enough.
> >>
> >> On Sat, Jan 18, 2014 at 11:47 PM, Mark Hamstra <mark@clearstorydata.com
> >
> >> wrote:
> >> > distinct() needs to do a shuffle -- which is resulting in the need to
> >> > materialize the map outputs as files.  count() doesn't.
> >> >
> >> >
> >> > On Sat, Jan 18, 2014 at 10:33 PM, Ryan Compton <
> compton.ryan@gmail.com>
> >> > wrote:
> >> >>
> >> >> I'm able to open ~13M files. I expect the output of
> >> >> .distinct().count() to be under 100M, why do I need so many files
> >> >> open?
> >> >>
> >> >> rfcompton@node19 ~> cat /etc/redhat-release
> >> >> CentOS release 5.7 (Final)
> >> >> rfcompton@node19 ~> cat /proc/sys/fs/file-max
> >> >> 13069279
> >> >>
> >> >> On Sat, Jan 18, 2014 at 9:12 AM, Jey Kottalam <je...@cs.berkeley.edu>
> >> >> wrote:
> >> >> > The "too many open files" error is due to running out of available
> >> >> > FDs, usually due to a limit set in the OS.
> >> >> >
> >> >> > The fix will depend on your specific OS, but under Linux it usually
> >> >> > involves the "fs.file-max" syctl.
> >> >> >
> >> >> > On Fri, Jan 17, 2014 at 3:02 PM, Ryan Compton
> >> >> > <co...@gmail.com>
> >> >> > wrote:
> >> >> >> When I try .distinct() my jobs fail. Possibly related:
> >> >> >> https://groups.google.com/forum/#!topic/shark-users/j2TO-GINuFo
> >> >> >>
> >> >> >> This works
> >> >> >>
> >> >> >>     //get the node ids
> >> >> >>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
> >> >> >>     //count the nodes
> >> >> >>     val numNodes = nodes.count()
> >> >> >>     logWarning("numNodes:\t"+numNodes)
> >> >> >>
> >> >> >> this fails
> >> >> >>
> >> >> >>     //get the node ids
> >> >> >>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
> >> >> >>     //count the nodes
> >> >> >>     val numNodes = nodes.distinct().count()
> >> >> >>     logWarning("numNodes:\t"+numNodes)
> >> >> >>
> >> >> >> with these stacktraces:
> >> >> >>
> >> >> >> 14/01/17 14:54:37 WARN scripts.ComputeNetworkStats: numEdges:
> >> >> >> 915189977
> >> >> >> 14/01/17 14:54:37 INFO rdd.MappedRDD: Removing RDD 1 from
> >> >> >> persistence
> >> >> >> list
> >> >> >> --
> >> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due
> >> >> >> to
> >> >> >> java.io.IOException
> >> >> >> java.io.IOException: Filesystem closed
> >> >> >> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:299)
> >> >> >> at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:77)
> >> >> >> at
>

Re: FileNotFoundException on distinct()?

Posted by Ryan Compton <co...@gmail.com>.
I've got

    System.setProperty("spark.shuffle.consolidate.files", "true");

but I'm getting the same error.

The output of the distinct count will be 101,230,940 (I did it in
pig). I've got 13 nodes and each node allows 13,069,279 open files. So
even with 1 record per file I think I've got enough. But what do the
rest of you have for /proc/sys/fs/file-max?

On Sun, Jan 19, 2014 at 5:12 PM, Mark Hamstra <ma...@clearstorydata.com> wrote:
> You should try setting spark.shuffle.consolidate.files to true.
>
>
> On Sun, Jan 19, 2014 at 4:49 PM, Ryan Compton <co...@gmail.com>
> wrote:
>>
>> I think I've shuffled this data before (I often join on it), and I
>> know I was using distinct() in 0.7.3 for the same computation.
>>
>> What do people usually have in  /proc/sys/fs/file-max? I'm real
>> surprised that 13M isn't enough.
>>
>> On Sat, Jan 18, 2014 at 11:47 PM, Mark Hamstra <ma...@clearstorydata.com>
>> wrote:
>> > distinct() needs to do a shuffle -- which is resulting in the need to
>> > materialize the map outputs as files.  count() doesn't.
>> >
>> >
>> > On Sat, Jan 18, 2014 at 10:33 PM, Ryan Compton <co...@gmail.com>
>> > wrote:
>> >>
>> >> I'm able to open ~13M files. I expect the output of
>> >> .distinct().count() to be under 100M, why do I need so many files
>> >> open?
>> >>
>> >> rfcompton@node19 ~> cat /etc/redhat-release
>> >> CentOS release 5.7 (Final)
>> >> rfcompton@node19 ~> cat /proc/sys/fs/file-max
>> >> 13069279
>> >>
>> >> On Sat, Jan 18, 2014 at 9:12 AM, Jey Kottalam <je...@cs.berkeley.edu>
>> >> wrote:
>> >> > The "too many open files" error is due to running out of available
>> >> > FDs, usually due to a limit set in the OS.
>> >> >
>> >> > The fix will depend on your specific OS, but under Linux it usually
>> >> > involves the "fs.file-max" syctl.
>> >> >
>> >> > On Fri, Jan 17, 2014 at 3:02 PM, Ryan Compton
>> >> > <co...@gmail.com>
>> >> > wrote:
>> >> >> When I try .distinct() my jobs fail. Possibly related:
>> >> >> https://groups.google.com/forum/#!topic/shark-users/j2TO-GINuFo
>> >> >>
>> >> >> This works
>> >> >>
>> >> >>     //get the node ids
>> >> >>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
>> >> >>     //count the nodes
>> >> >>     val numNodes = nodes.count()
>> >> >>     logWarning("numNodes:\t"+numNodes)
>> >> >>
>> >> >> this fails
>> >> >>
>> >> >>     //get the node ids
>> >> >>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
>> >> >>     //count the nodes
>> >> >>     val numNodes = nodes.distinct().count()
>> >> >>     logWarning("numNodes:\t"+numNodes)
>> >> >>
>> >> >> with these stacktraces:
>> >> >>
>> >> >> 14/01/17 14:54:37 WARN scripts.ComputeNetworkStats: numEdges:
>> >> >> 915189977
>> >> >> 14/01/17 14:54:37 INFO rdd.MappedRDD: Removing RDD 1 from
>> >> >> persistence
>> >> >> list
>> >> >> --
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due
>> >> >> to
>> >> >> java.io.IOException
>> >> >> java.io.IOException: Filesystem closed
>> >> >> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:299)
>> >> >> at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:77)
>> >> >> at
>> >> >>
>> >> >> org.apache.hadoop.hdfs.DFSClient$DFSInputStream.read(DFSClient.java:2317)
>> >> >> at java.io.DataInputStream.read(DataInputStream.java:83)
>> >> >> at
>> >> >>
>> >> >> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:205)
>> >> >> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:169)
>> >> >> at
>> >> >>
>> >> >> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
>> >> >> at
>> >> >>
>> >> >> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>> >> >> at
>> >> >> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
>> >> >> at
>> >> >> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
>> >> >> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> >> >> at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
>> >> >> at
>> >> >>
>> >> >> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> >> >> at
>> >> >>
>> >> >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
>> >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> >> >> at
>> >> >>
>> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >> >> at
>> >> >>
>> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >> >> at java.lang.Thread.run(Thread.java:662)
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> >> >> 2.0:1419 as TID 1396 on executor 6: node25 (PROCESS_LOCAL)
>> >> >> --
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due
>> >> >> to
>> >> >> java.lang.IllegalStateException
>> >> >> java.lang.IllegalStateException: Shutdown in progress
>> >> >> at
>> >> >>
>> >> >> java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:39)
>> >> >> at java.lang.Runtime.addShutdownHook(Runtime.java:192)
>> >> >> at
>> >> >>
>> >> >> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:1655)
>> >> >> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1627)
>> >> >> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
>> >> >> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:183)
>> >> >> at
>> >> >>
>> >> >> org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:92)
>> >> >> at
>> >> >>
>> >> >> org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:54)
>> >> >> at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:93)
>> >> >> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:83)
>> >> >> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:51)
>> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> >> at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:29)
>> >> >> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:69)
>> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> >> >> at
>> >> >>
>> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >> >> at
>> >> >>
>> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >> >> at java.lang.Thread.run(Thread.java:662)
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> >> >> 2.0:1419 as TID 1403 on executor 6: node25 (PROCESS_LOCAL)
>> >> >> --
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due
>> >> >> to
>> >> >> java.io.FileNotFoundException
>> >> >> java.io.FileNotFoundException:
>> >> >> /tmp/spark-local-20140117145333-5d01/27/shuffle_0_54_1140 (Too many
>> >> >> open files)
>> >> >> at java.io.FileOutputStream.openAppend(Native Method)
>> >> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> >> >> at
>> >> >>
>> >> >> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> >> >> at
>> >> >>
>> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >> >> at
>> >> >>
>> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >> >> at java.lang.Thread.run(Thread.java:662)
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> >> >> 2.0:54 as TID 1407 on executor 6: node25 (NODE_LOCAL)
>> >> >> --
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due
>> >> >> to
>> >> >> java.io.FileNotFoundException
>> >> >> java.io.FileNotFoundException:
>> >> >> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_441_482 (Too many
>> >> >> open files)
>> >> >> at java.io.FileOutputStream.openAppend(Native Method)
>> >> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> >> >> at
>> >> >>
>> >> >> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> >> >> at
>> >> >>
>> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >> >> at
>> >> >>
>> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >> >> at java.lang.Thread.run(Thread.java:662)
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> >> >> 2.0:441 as TID 1415 on executor 6: node25 (PROCESS_LOCAL)
>> >> >> --
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due
>> >> >> to
>> >> >> java.io.FileNotFoundException
>> >> >> java.io.FileNotFoundException:
>> >> >> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_238_365 (Too many
>> >> >> open files)
>> >> >> at java.io.FileOutputStream.openAppend(Native Method)
>> >> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> >> >> at
>> >> >>
>> >> >> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> >> >> at
>> >> >>
>> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >> >> at
>> >> >>
>> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >> >> at java.lang.Thread.run(Thread.java:662)
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> >> >> 2.0:238 as TID 1423 on executor 6: node25 (NODE_LOCAL)
>> >> >> --
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due
>> >> >> to
>> >> >> java.io.FileNotFoundException
>> >> >> java.io.FileNotFoundException:
>> >> >> /tmp/spark-local-20140117145333-5d01/35/shuffle_0_37_144 (Too many
>> >> >> open files)
>> >> >> at java.io.FileOutputStream.openAppend(Native Method)
>> >> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> >> >> at
>> >> >>
>> >> >> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> >> >> at
>> >> >>
>> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >> >> at
>> >> >>
>> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >> >> at java.lang.Thread.run(Thread.java:662)
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> >> >> 2.0:37 as TID 1424 on executor 6: node25 (NODE_LOCAL)
>> >> >> --
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due
>> >> >> to
>> >> >> java.lang.IllegalStateException: Shutdown in progress [duplicate 19]
>> >> >> 14/01/17 14:56:07 ERROR cluster.ClusterTaskSetManager: Task 2.0:1679
>> >> >> failed more than 4 times; aborting job
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Remove TaskSet 2.0
>> >> >> from pool
>> >> >> --
>> >> >> 14/01/17 14:56:07 INFO scheduler.DAGScheduler: Failed to run count
>> >> >> at
>> >> >> ComputeNetworkStats.scala:59
>> >> >> Exception in thread "main" org.apache.spark.SparkException: Job
>> >> >> failed: Task 2.0:1679 failed more than 4 times
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>> >> >> at
>> >> >>
>> >> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> >> >> at
>> >> >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>> >> >> at
>> >> >>
>> >> >> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>> >> >> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Ignoring update
>> >> >> from
>> >> >> TID 1418 because its task set is gone
>> >
>> >
>
>

Re: FileNotFoundException on distinct()?

Posted by Mark Hamstra <ma...@clearstorydata.com>.
You should try setting
spark.shuffle.consolidate.files<http://spark.incubator.apache.org/docs/latest/configuration.html>
to
true.


On Sun, Jan 19, 2014 at 4:49 PM, Ryan Compton <co...@gmail.com>wrote:

> I think I've shuffled this data before (I often join on it), and I
> know I was using distinct() in 0.7.3 for the same computation.
>
> What do people usually have in  /proc/sys/fs/file-max? I'm real
> surprised that 13M isn't enough.
>
> On Sat, Jan 18, 2014 at 11:47 PM, Mark Hamstra <ma...@clearstorydata.com>
> wrote:
> > distinct() needs to do a shuffle -- which is resulting in the need to
> > materialize the map outputs as files.  count() doesn't.
> >
> >
> > On Sat, Jan 18, 2014 at 10:33 PM, Ryan Compton <co...@gmail.com>
> > wrote:
> >>
> >> I'm able to open ~13M files. I expect the output of
> >> .distinct().count() to be under 100M, why do I need so many files
> >> open?
> >>
> >> rfcompton@node19 ~> cat /etc/redhat-release
> >> CentOS release 5.7 (Final)
> >> rfcompton@node19 ~> cat /proc/sys/fs/file-max
> >> 13069279
> >>
> >> On Sat, Jan 18, 2014 at 9:12 AM, Jey Kottalam <je...@cs.berkeley.edu>
> wrote:
> >> > The "too many open files" error is due to running out of available
> >> > FDs, usually due to a limit set in the OS.
> >> >
> >> > The fix will depend on your specific OS, but under Linux it usually
> >> > involves the "fs.file-max" syctl.
> >> >
> >> > On Fri, Jan 17, 2014 at 3:02 PM, Ryan Compton <compton.ryan@gmail.com
> >
> >> > wrote:
> >> >> When I try .distinct() my jobs fail. Possibly related:
> >> >> https://groups.google.com/forum/#!topic/shark-users/j2TO-GINuFo
> >> >>
> >> >> This works
> >> >>
> >> >>     //get the node ids
> >> >>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
> >> >>     //count the nodes
> >> >>     val numNodes = nodes.count()
> >> >>     logWarning("numNodes:\t"+numNodes)
> >> >>
> >> >> this fails
> >> >>
> >> >>     //get the node ids
> >> >>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
> >> >>     //count the nodes
> >> >>     val numNodes = nodes.distinct().count()
> >> >>     logWarning("numNodes:\t"+numNodes)
> >> >>
> >> >> with these stacktraces:
> >> >>
> >> >> 14/01/17 14:54:37 WARN scripts.ComputeNetworkStats: numEdges:
> 915189977
> >> >> 14/01/17 14:54:37 INFO rdd.MappedRDD: Removing RDD 1 from persistence
> >> >> list
> >> >> --
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> >> java.io.IOException
> >> >> java.io.IOException: Filesystem closed
> >> >> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:299)
> >> >> at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:77)
> >> >> at
> >> >>
> org.apache.hadoop.hdfs.DFSClient$DFSInputStream.read(DFSClient.java:2317)
> >> >> at java.io.DataInputStream.read(DataInputStream.java:83)
> >> >> at
> >> >>
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:205)
> >> >> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:169)
> >> >> at
> >> >>
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
> >> >> at
> >> >>
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
> >> >> at
> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
> >> >> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
> >> >> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> >> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >> >> at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
> >> >> at
> >> >>
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> >> >> at
> >> >>
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
> >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> >> at
> >> >>
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> >> >> at
> >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> >> >> at
> >> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >> >> at
> >> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >> >> at java.lang.Thread.run(Thread.java:662)
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> >> >> 2.0:1419 as TID 1396 on executor 6: node25 (PROCESS_LOCAL)
> >> >> --
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> >> java.lang.IllegalStateException
> >> >> java.lang.IllegalStateException: Shutdown in progress
> >> >> at
> >> >>
> java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:39)
> >> >> at java.lang.Runtime.addShutdownHook(Runtime.java:192)
> >> >> at
> >> >>
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:1655)
> >> >> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1627)
> >> >> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
> >> >> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:183)
> >> >> at
> >> >>
> org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:92)
> >> >> at
> >> >>
> org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:54)
> >> >> at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:93)
> >> >> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:83)
> >> >> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:51)
> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> >> at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:29)
> >> >> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:69)
> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
> >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> >> at
> >> >>
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
> >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> >> >> at
> >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> >> >> at
> >> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >> >> at
> >> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >> >> at java.lang.Thread.run(Thread.java:662)
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> >> >> 2.0:1419 as TID 1403 on executor 6: node25 (PROCESS_LOCAL)
> >> >> --
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> >> java.io.FileNotFoundException
> >> >> java.io.FileNotFoundException:
> >> >> /tmp/spark-local-20140117145333-5d01/27/shuffle_0_54_1140 (Too many
> >> >> open files)
> >> >> at java.io.FileOutputStream.openAppend(Native Method)
> >> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> >> >> at
> >> >>
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
> >> >> at
> >> >>
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >> >> at
> >> >>
> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> >> >> at
> >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> >> >> at
> >> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >> >> at
> >> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >> >> at java.lang.Thread.run(Thread.java:662)
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> >> >> 2.0:54 as TID 1407 on executor 6: node25 (NODE_LOCAL)
> >> >> --
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> >> java.io.FileNotFoundException
> >> >> java.io.FileNotFoundException:
> >> >> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_441_482 (Too many
> >> >> open files)
> >> >> at java.io.FileOutputStream.openAppend(Native Method)
> >> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> >> >> at
> >> >>
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
> >> >> at
> >> >>
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >> >> at
> >> >>
> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> >> >> at
> >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> >> >> at
> >> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >> >> at
> >> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >> >> at java.lang.Thread.run(Thread.java:662)
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> >> >> 2.0:441 as TID 1415 on executor 6: node25 (PROCESS_LOCAL)
> >> >> --
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> >> java.io.FileNotFoundException
> >> >> java.io.FileNotFoundException:
> >> >> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_238_365 (Too many
> >> >> open files)
> >> >> at java.io.FileOutputStream.openAppend(Native Method)
> >> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> >> >> at
> >> >>
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
> >> >> at
> >> >>
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >> >> at
> >> >>
> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> >> >> at
> >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> >> >> at
> >> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >> >> at
> >> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >> >> at java.lang.Thread.run(Thread.java:662)
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> >> >> 2.0:238 as TID 1423 on executor 6: node25 (NODE_LOCAL)
> >> >> --
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> >> java.io.FileNotFoundException
> >> >> java.io.FileNotFoundException:
> >> >> /tmp/spark-local-20140117145333-5d01/35/shuffle_0_37_144 (Too many
> >> >> open files)
> >> >> at java.io.FileOutputStream.openAppend(Native Method)
> >> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> >> >> at
> >> >>
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
> >> >> at
> >> >>
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
> >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >> >> at
> >> >>
> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> >> >> at
> >> >>
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> >> >> at
> >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> >> >> at
> >> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >> >> at
> >> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >> >> at java.lang.Thread.run(Thread.java:662)
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> >> >> 2.0:37 as TID 1424 on executor 6: node25 (NODE_LOCAL)
> >> >> --
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> >> java.lang.IllegalStateException: Shutdown in progress [duplicate 19]
> >> >> 14/01/17 14:56:07 ERROR cluster.ClusterTaskSetManager: Task 2.0:1679
> >> >> failed more than 4 times; aborting job
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Remove TaskSet 2.0
> >> >> from pool
> >> >> --
> >> >> 14/01/17 14:56:07 INFO scheduler.DAGScheduler: Failed to run count at
> >> >> ComputeNetworkStats.scala:59
> >> >> Exception in thread "main" org.apache.spark.SparkException: Job
> >> >> failed: Task 2.0:1679 failed more than 4 times
> >> >> at
> >> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
> >> >> at
> >> >>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
> >> >> at
> >> >>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> >> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >> >> at
> >> >>
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
> >> >> at
> >> >>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
> >> >> at
> >> >> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
> >> >> at
> >> >>
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
> >> >> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Ignoring update from
> >> >> TID 1418 because its task set is gone
> >
> >
>

Re: FileNotFoundException on distinct()?

Posted by Ryan Compton <co...@gmail.com>.
I think I've shuffled this data before (I often join on it), and I
know I was using distinct() in 0.7.3 for the same computation.

What do people usually have in  /proc/sys/fs/file-max? I'm real
surprised that 13M isn't enough.

On Sat, Jan 18, 2014 at 11:47 PM, Mark Hamstra <ma...@clearstorydata.com> wrote:
> distinct() needs to do a shuffle -- which is resulting in the need to
> materialize the map outputs as files.  count() doesn't.
>
>
> On Sat, Jan 18, 2014 at 10:33 PM, Ryan Compton <co...@gmail.com>
> wrote:
>>
>> I'm able to open ~13M files. I expect the output of
>> .distinct().count() to be under 100M, why do I need so many files
>> open?
>>
>> rfcompton@node19 ~> cat /etc/redhat-release
>> CentOS release 5.7 (Final)
>> rfcompton@node19 ~> cat /proc/sys/fs/file-max
>> 13069279
>>
>> On Sat, Jan 18, 2014 at 9:12 AM, Jey Kottalam <je...@cs.berkeley.edu> wrote:
>> > The "too many open files" error is due to running out of available
>> > FDs, usually due to a limit set in the OS.
>> >
>> > The fix will depend on your specific OS, but under Linux it usually
>> > involves the "fs.file-max" syctl.
>> >
>> > On Fri, Jan 17, 2014 at 3:02 PM, Ryan Compton <co...@gmail.com>
>> > wrote:
>> >> When I try .distinct() my jobs fail. Possibly related:
>> >> https://groups.google.com/forum/#!topic/shark-users/j2TO-GINuFo
>> >>
>> >> This works
>> >>
>> >>     //get the node ids
>> >>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
>> >>     //count the nodes
>> >>     val numNodes = nodes.count()
>> >>     logWarning("numNodes:\t"+numNodes)
>> >>
>> >> this fails
>> >>
>> >>     //get the node ids
>> >>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
>> >>     //count the nodes
>> >>     val numNodes = nodes.distinct().count()
>> >>     logWarning("numNodes:\t"+numNodes)
>> >>
>> >> with these stacktraces:
>> >>
>> >> 14/01/17 14:54:37 WARN scripts.ComputeNetworkStats: numEdges: 915189977
>> >> 14/01/17 14:54:37 INFO rdd.MappedRDD: Removing RDD 1 from persistence
>> >> list
>> >> --
>> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> >> java.io.IOException
>> >> java.io.IOException: Filesystem closed
>> >> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:299)
>> >> at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:77)
>> >> at
>> >> org.apache.hadoop.hdfs.DFSClient$DFSInputStream.read(DFSClient.java:2317)
>> >> at java.io.DataInputStream.read(DataInputStream.java:83)
>> >> at
>> >> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:205)
>> >> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:169)
>> >> at
>> >> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
>> >> at
>> >> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>> >> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
>> >> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
>> >> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> >> at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
>> >> at
>> >> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> >> at
>> >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
>> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> at
>> >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> >> at
>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >> at java.lang.Thread.run(Thread.java:662)
>> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> >> 2.0:1419 as TID 1396 on executor 6: node25 (PROCESS_LOCAL)
>> >> --
>> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> >> java.lang.IllegalStateException
>> >> java.lang.IllegalStateException: Shutdown in progress
>> >> at
>> >> java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:39)
>> >> at java.lang.Runtime.addShutdownHook(Runtime.java:192)
>> >> at
>> >> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:1655)
>> >> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1627)
>> >> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
>> >> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:183)
>> >> at
>> >> org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:92)
>> >> at
>> >> org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:54)
>> >> at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:93)
>> >> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:83)
>> >> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:51)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:29)
>> >> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:69)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> at
>> >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> >> at
>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >> at java.lang.Thread.run(Thread.java:662)
>> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> >> 2.0:1419 as TID 1403 on executor 6: node25 (PROCESS_LOCAL)
>> >> --
>> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> >> java.io.FileNotFoundException
>> >> java.io.FileNotFoundException:
>> >> /tmp/spark-local-20140117145333-5d01/27/shuffle_0_54_1140 (Too many
>> >> open files)
>> >> at java.io.FileOutputStream.openAppend(Native Method)
>> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>> >> at
>> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
>> >> at
>> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> >> at
>> >> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> >> at
>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >> at java.lang.Thread.run(Thread.java:662)
>> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> >> 2.0:54 as TID 1407 on executor 6: node25 (NODE_LOCAL)
>> >> --
>> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> >> java.io.FileNotFoundException
>> >> java.io.FileNotFoundException:
>> >> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_441_482 (Too many
>> >> open files)
>> >> at java.io.FileOutputStream.openAppend(Native Method)
>> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>> >> at
>> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
>> >> at
>> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> >> at
>> >> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> >> at
>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >> at java.lang.Thread.run(Thread.java:662)
>> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> >> 2.0:441 as TID 1415 on executor 6: node25 (PROCESS_LOCAL)
>> >> --
>> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> >> java.io.FileNotFoundException
>> >> java.io.FileNotFoundException:
>> >> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_238_365 (Too many
>> >> open files)
>> >> at java.io.FileOutputStream.openAppend(Native Method)
>> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>> >> at
>> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
>> >> at
>> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> >> at
>> >> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> >> at
>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >> at java.lang.Thread.run(Thread.java:662)
>> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> >> 2.0:238 as TID 1423 on executor 6: node25 (NODE_LOCAL)
>> >> --
>> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> >> java.io.FileNotFoundException
>> >> java.io.FileNotFoundException:
>> >> /tmp/spark-local-20140117145333-5d01/35/shuffle_0_37_144 (Too many
>> >> open files)
>> >> at java.io.FileOutputStream.openAppend(Native Method)
>> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>> >> at
>> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
>> >> at
>> >> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> >> at
>> >> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> >> at
>> >> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> >> at
>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >> at
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> >> at java.lang.Thread.run(Thread.java:662)
>> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> >> 2.0:37 as TID 1424 on executor 6: node25 (NODE_LOCAL)
>> >> --
>> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> >> java.lang.IllegalStateException: Shutdown in progress [duplicate 19]
>> >> 14/01/17 14:56:07 ERROR cluster.ClusterTaskSetManager: Task 2.0:1679
>> >> failed more than 4 times; aborting job
>> >> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Remove TaskSet 2.0
>> >> from pool
>> >> --
>> >> 14/01/17 14:56:07 INFO scheduler.DAGScheduler: Failed to run count at
>> >> ComputeNetworkStats.scala:59
>> >> Exception in thread "main" org.apache.spark.SparkException: Job
>> >> failed: Task 2.0:1679 failed more than 4 times
>> >> at
>> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>> >> at
>> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>> >> at
>> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >> at
>> >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>> >> at
>> >> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
>> >> at
>> >> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>> >> at
>> >> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>> >> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Ignoring update from
>> >> TID 1418 because its task set is gone
>
>

Re: FileNotFoundException on distinct()?

Posted by Mark Hamstra <ma...@clearstorydata.com>.
distinct() needs to do a shuffle -- which is resulting in the need to
materialize the map outputs as files.  count() doesn't.


On Sat, Jan 18, 2014 at 10:33 PM, Ryan Compton <co...@gmail.com>wrote:

> I'm able to open ~13M files. I expect the output of
> .distinct().count() to be under 100M, why do I need so many files
> open?
>
> rfcompton@node19 ~> cat /etc/redhat-release
> CentOS release 5.7 (Final)
> rfcompton@node19 ~> cat /proc/sys/fs/file-max
> 13069279
>
> On Sat, Jan 18, 2014 at 9:12 AM, Jey Kottalam <je...@cs.berkeley.edu> wrote:
> > The "too many open files" error is due to running out of available
> > FDs, usually due to a limit set in the OS.
> >
> > The fix will depend on your specific OS, but under Linux it usually
> > involves the "fs.file-max" syctl.
> >
> > On Fri, Jan 17, 2014 at 3:02 PM, Ryan Compton <co...@gmail.com>
> wrote:
> >> When I try .distinct() my jobs fail. Possibly related:
> >> https://groups.google.com/forum/#!topic/shark-users/j2TO-GINuFo
> >>
> >> This works
> >>
> >>     //get the node ids
> >>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
> >>     //count the nodes
> >>     val numNodes = nodes.count()
> >>     logWarning("numNodes:\t"+numNodes)
> >>
> >> this fails
> >>
> >>     //get the node ids
> >>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
> >>     //count the nodes
> >>     val numNodes = nodes.distinct().count()
> >>     logWarning("numNodes:\t"+numNodes)
> >>
> >> with these stacktraces:
> >>
> >> 14/01/17 14:54:37 WARN scripts.ComputeNetworkStats: numEdges: 915189977
> >> 14/01/17 14:54:37 INFO rdd.MappedRDD: Removing RDD 1 from persistence
> list
> >> --
> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> java.io.IOException
> >> java.io.IOException: Filesystem closed
> >> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:299)
> >> at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:77)
> >> at
> org.apache.hadoop.hdfs.DFSClient$DFSInputStream.read(DFSClient.java:2317)
> >> at java.io.DataInputStream.read(DataInputStream.java:83)
> >> at
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:205)
> >> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:169)
> >> at
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
> >> at
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
> >> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
> >> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
> >> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> >> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >> at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
> >> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> >> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >> at java.lang.Thread.run(Thread.java:662)
> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> >> 2.0:1419 as TID 1396 on executor 6: node25 (PROCESS_LOCAL)
> >> --
> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> java.lang.IllegalStateException
> >> java.lang.IllegalStateException: Shutdown in progress
> >> at
> java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:39)
> >> at java.lang.Runtime.addShutdownHook(Runtime.java:192)
> >> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:1655)
> >> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1627)
> >> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
> >> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:183)
> >> at
> org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:92)
> >> at
> org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:54)
> >> at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:93)
> >> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:83)
> >> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:51)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:29)
> >> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:69)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >> at java.lang.Thread.run(Thread.java:662)
> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> >> 2.0:1419 as TID 1403 on executor 6: node25 (PROCESS_LOCAL)
> >> --
> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> java.io.FileNotFoundException
> >> java.io.FileNotFoundException:
> >> /tmp/spark-local-20140117145333-5d01/27/shuffle_0_54_1140 (Too many
> >> open files)
> >> at java.io.FileOutputStream.openAppend(Native Method)
> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> >> at
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
> >> at
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >> at
> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >> at java.lang.Thread.run(Thread.java:662)
> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> >> 2.0:54 as TID 1407 on executor 6: node25 (NODE_LOCAL)
> >> --
> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> java.io.FileNotFoundException
> >> java.io.FileNotFoundException:
> >> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_441_482 (Too many
> >> open files)
> >> at java.io.FileOutputStream.openAppend(Native Method)
> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> >> at
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
> >> at
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >> at
> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >> at java.lang.Thread.run(Thread.java:662)
> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> >> 2.0:441 as TID 1415 on executor 6: node25 (PROCESS_LOCAL)
> >> --
> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> java.io.FileNotFoundException
> >> java.io.FileNotFoundException:
> >> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_238_365 (Too many
> >> open files)
> >> at java.io.FileOutputStream.openAppend(Native Method)
> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> >> at
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
> >> at
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >> at
> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >> at java.lang.Thread.run(Thread.java:662)
> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> >> 2.0:238 as TID 1423 on executor 6: node25 (NODE_LOCAL)
> >> --
> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> java.io.FileNotFoundException
> >> java.io.FileNotFoundException:
> >> /tmp/spark-local-20140117145333-5d01/35/shuffle_0_37_144 (Too many
> >> open files)
> >> at java.io.FileOutputStream.openAppend(Native Method)
> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> >> at
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
> >> at
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
> >> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> >> at
> scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> >> at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >> at java.lang.Thread.run(Thread.java:662)
> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> >> 2.0:37 as TID 1424 on executor 6: node25 (NODE_LOCAL)
> >> --
> >> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> >> java.lang.IllegalStateException: Shutdown in progress [duplicate 19]
> >> 14/01/17 14:56:07 ERROR cluster.ClusterTaskSetManager: Task 2.0:1679
> >> failed more than 4 times; aborting job
> >> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Remove TaskSet 2.0
> from pool
> >> --
> >> 14/01/17 14:56:07 INFO scheduler.DAGScheduler: Failed to run count at
> >> ComputeNetworkStats.scala:59
> >> Exception in thread "main" org.apache.spark.SparkException: Job
> >> failed: Task 2.0:1679 failed more than 4 times
> >> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
> >> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
> >> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
> >> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
> >> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
> >> at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
> >> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Ignoring update from
> >> TID 1418 because its task set is gone
>

Re: FileNotFoundException on distinct()?

Posted by Ryan Compton <co...@gmail.com>.
I'm able to open ~13M files. I expect the output of
.distinct().count() to be under 100M, why do I need so many files
open?

rfcompton@node19 ~> cat /etc/redhat-release
CentOS release 5.7 (Final)
rfcompton@node19 ~> cat /proc/sys/fs/file-max
13069279

On Sat, Jan 18, 2014 at 9:12 AM, Jey Kottalam <je...@cs.berkeley.edu> wrote:
> The "too many open files" error is due to running out of available
> FDs, usually due to a limit set in the OS.
>
> The fix will depend on your specific OS, but under Linux it usually
> involves the "fs.file-max" syctl.
>
> On Fri, Jan 17, 2014 at 3:02 PM, Ryan Compton <co...@gmail.com> wrote:
>> When I try .distinct() my jobs fail. Possibly related:
>> https://groups.google.com/forum/#!topic/shark-users/j2TO-GINuFo
>>
>> This works
>>
>>     //get the node ids
>>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
>>     //count the nodes
>>     val numNodes = nodes.count()
>>     logWarning("numNodes:\t"+numNodes)
>>
>> this fails
>>
>>     //get the node ids
>>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
>>     //count the nodes
>>     val numNodes = nodes.distinct().count()
>>     logWarning("numNodes:\t"+numNodes)
>>
>> with these stacktraces:
>>
>> 14/01/17 14:54:37 WARN scripts.ComputeNetworkStats: numEdges: 915189977
>> 14/01/17 14:54:37 INFO rdd.MappedRDD: Removing RDD 1 from persistence list
>> --
>> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> java.io.IOException
>> java.io.IOException: Filesystem closed
>> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:299)
>> at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:77)
>> at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.read(DFSClient.java:2317)
>> at java.io.DataInputStream.read(DataInputStream.java:83)
>> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:205)
>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:169)
>> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
>> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
>> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
>> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> at java.lang.Thread.run(Thread.java:662)
>> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> 2.0:1419 as TID 1396 on executor 6: node25 (PROCESS_LOCAL)
>> --
>> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> java.lang.IllegalStateException
>> java.lang.IllegalStateException: Shutdown in progress
>> at java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:39)
>> at java.lang.Runtime.addShutdownHook(Runtime.java:192)
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:1655)
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1627)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:183)
>> at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:92)
>> at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:54)
>> at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:93)
>> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:83)
>> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:51)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:29)
>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:69)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> at java.lang.Thread.run(Thread.java:662)
>> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> 2.0:1419 as TID 1403 on executor 6: node25 (PROCESS_LOCAL)
>> --
>> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> java.io.FileNotFoundException
>> java.io.FileNotFoundException:
>> /tmp/spark-local-20140117145333-5d01/27/shuffle_0_54_1140 (Too many
>> open files)
>> at java.io.FileOutputStream.openAppend(Native Method)
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
>> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
>> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> at scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
>> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> at java.lang.Thread.run(Thread.java:662)
>> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> 2.0:54 as TID 1407 on executor 6: node25 (NODE_LOCAL)
>> --
>> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> java.io.FileNotFoundException
>> java.io.FileNotFoundException:
>> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_441_482 (Too many
>> open files)
>> at java.io.FileOutputStream.openAppend(Native Method)
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
>> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
>> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> at scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
>> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> at java.lang.Thread.run(Thread.java:662)
>> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> 2.0:441 as TID 1415 on executor 6: node25 (PROCESS_LOCAL)
>> --
>> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> java.io.FileNotFoundException
>> java.io.FileNotFoundException:
>> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_238_365 (Too many
>> open files)
>> at java.io.FileOutputStream.openAppend(Native Method)
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
>> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
>> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> at scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
>> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> at java.lang.Thread.run(Thread.java:662)
>> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> 2.0:238 as TID 1423 on executor 6: node25 (NODE_LOCAL)
>> --
>> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> java.io.FileNotFoundException
>> java.io.FileNotFoundException:
>> /tmp/spark-local-20140117145333-5d01/35/shuffle_0_37_144 (Too many
>> open files)
>> at java.io.FileOutputStream.openAppend(Native Method)
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
>> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
>> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>> at scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
>> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> at java.lang.Thread.run(Thread.java:662)
>> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
>> 2.0:37 as TID 1424 on executor 6: node25 (NODE_LOCAL)
>> --
>> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
>> java.lang.IllegalStateException: Shutdown in progress [duplicate 19]
>> 14/01/17 14:56:07 ERROR cluster.ClusterTaskSetManager: Task 2.0:1679
>> failed more than 4 times; aborting job
>> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Remove TaskSet 2.0 from pool
>> --
>> 14/01/17 14:56:07 INFO scheduler.DAGScheduler: Failed to run count at
>> ComputeNetworkStats.scala:59
>> Exception in thread "main" org.apache.spark.SparkException: Job
>> failed: Task 2.0:1679 failed more than 4 times
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>> at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
>> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>> at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Ignoring update from
>> TID 1418 because its task set is gone

Re: FileNotFoundException on distinct()?

Posted by Jey Kottalam <je...@cs.berkeley.edu>.
The "too many open files" error is due to running out of available
FDs, usually due to a limit set in the OS.

The fix will depend on your specific OS, but under Linux it usually
involves the "fs.file-max" syctl.

On Fri, Jan 17, 2014 at 3:02 PM, Ryan Compton <co...@gmail.com> wrote:
> When I try .distinct() my jobs fail. Possibly related:
> https://groups.google.com/forum/#!topic/shark-users/j2TO-GINuFo
>
> This works
>
>     //get the node ids
>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
>     //count the nodes
>     val numNodes = nodes.count()
>     logWarning("numNodes:\t"+numNodes)
>
> this fails
>
>     //get the node ids
>     val nodes = dupedKeyedEdgeList.map(x => x._1).cache()
>     //count the nodes
>     val numNodes = nodes.distinct().count()
>     logWarning("numNodes:\t"+numNodes)
>
> with these stacktraces:
>
> 14/01/17 14:54:37 WARN scripts.ComputeNetworkStats: numEdges: 915189977
> 14/01/17 14:54:37 INFO rdd.MappedRDD: Removing RDD 1 from persistence list
> --
> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> java.io.IOException
> java.io.IOException: Filesystem closed
> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:299)
> at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:77)
> at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.read(DFSClient.java:2317)
> at java.io.DataInputStream.read(DataInputStream.java:83)
> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:205)
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:169)
> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> 2.0:1419 as TID 1396 on executor 6: node25 (PROCESS_LOCAL)
> --
> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> java.lang.IllegalStateException
> java.lang.IllegalStateException: Shutdown in progress
> at java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:39)
> at java.lang.Runtime.addShutdownHook(Runtime.java:192)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:1655)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1627)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:183)
> at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:92)
> at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:54)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:93)
> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:83)
> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:51)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:29)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:69)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> 2.0:1419 as TID 1403 on executor 6: node25 (PROCESS_LOCAL)
> --
> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> java.io.FileNotFoundException
> java.io.FileNotFoundException:
> /tmp/spark-local-20140117145333-5d01/27/shuffle_0_54_1140 (Too many
> open files)
> at java.io.FileOutputStream.openAppend(Native Method)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> 2.0:54 as TID 1407 on executor 6: node25 (NODE_LOCAL)
> --
> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> java.io.FileNotFoundException
> java.io.FileNotFoundException:
> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_441_482 (Too many
> open files)
> at java.io.FileOutputStream.openAppend(Native Method)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> 2.0:441 as TID 1415 on executor 6: node25 (PROCESS_LOCAL)
> --
> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> java.io.FileNotFoundException
> java.io.FileNotFoundException:
> /tmp/spark-local-20140117145333-5d01/0b/shuffle_0_238_365 (Too many
> open files)
> at java.io.FileOutputStream.openAppend(Native Method)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> 2.0:238 as TID 1423 on executor 6: node25 (NODE_LOCAL)
> --
> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> java.io.FileNotFoundException
> java.io.FileNotFoundException:
> /tmp/spark-local-20140117145333-5d01/35/shuffle_0_37_144 (Too many
> open files)
> at java.io.FileOutputStream.openAppend(Native Method)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.open(DiskStore.scala:58)
> at org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:107)
> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
> at org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.JavaConversions$JMapWrapperLike$$anon$2.foreach(JavaConversions.scala:781)
> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
> at org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Starting task
> 2.0:37 as TID 1424 on executor 6: node25 (NODE_LOCAL)
> --
> 14/01/17 14:56:07 INFO cluster.ClusterTaskSetManager: Loss was due to
> java.lang.IllegalStateException: Shutdown in progress [duplicate 19]
> 14/01/17 14:56:07 ERROR cluster.ClusterTaskSetManager: Task 2.0:1679
> failed more than 4 times; aborting job
> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Remove TaskSet 2.0 from pool
> --
> 14/01/17 14:56:07 INFO scheduler.DAGScheduler: Failed to run count at
> ComputeNetworkStats.scala:59
> Exception in thread "main" org.apache.spark.SparkException: Job
> failed: Task 2.0:1679 failed more than 4 times
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
> at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
> at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
> 14/01/17 14:56:07 INFO cluster.ClusterScheduler: Ignoring update from
> TID 1418 because its task set is gone