You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sonal Goyal <so...@gmail.com> on 2014/03/27 08:36:54 UTC

Re:

Hi Hahn,

What's the ulimit on your systems? Please check the following link for a discussion on the too many files open.

http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCANGvG8qpn_WLLsRcJEGDB7HMza2uX7mYxZhfvTZ+b-sDxdKRUg@mail.gmail.com%3E


Sent from my iPad

> On Mar 27, 2014, at 12:15 PM, Hahn Jiang <ha...@gmail.com> wrote:
> 
> Hi, all
> 
> I write a spark program on yarn. When I use small size input file, my program can run well. But my job will failed if input size is more than 40G.
> 
> the error log:
> java.io.FileNotFoundException (java.io.FileNotFoundException: /home/work/data12/yarn/nodemanager/usercache/appcache/application_1392894597330_86813/spark-local-20140327144433-716b/24/shuffle_0_22_890 (Too many open files))
> java.io.FileOutputStream.openAppend(Native Method)
> java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.foreach(ExternalAppendOnlyMap.scala:239)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
> org.apache.spark.scheduler.Task.run(Task.scala:53)
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> java.lang.Thread.run(Thread.java:662)
> 
> 
> my object:
> object Test {
> 
>   def main(args: Array[String]) {
>     val sc = new SparkContext(args(0), "Test",
>       System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
> 
>     val mg = sc.textFile("/user/.../part-*")
>     val mct = sc.textFile("/user/.../part-*")
> 
>     val pair1 = mg.map {
>       s =>
>         val cols = s.split("\t")
>         (cols(0), cols(1))
>     }
>     val pair2 = mct.map {
>       s =>
>         val cols = s.split("\t")
>         (cols(0), cols(1))
>     }
>     val merge = pair1.union(pair2)
>     val result = merge.reduceByKey(_ + _)
>     val outputPath = new Path("/user/xxx/temp/spark-output")
>     outputPath.getFileSystem(new Configuration()).delete(outputPath, true)
>     result.saveAsTextFile(outputPath.toString)
> 
>     System.exit(0)
>   }
> 
> }
> 
> My spark version is 0.9 and I run my job use this command "/opt/soft/spark/bin/spark-class org.apache.spark.deploy.yarn.Client --jar ./spark-example_2.10-0.1-SNAPSHOT.jar --class Test --queue default --args yarn-standalone --num-workers 500 --master-memory 7g --worker-memory 7g --worker-cores 2"
> 

Re:

Posted by Hahn Jiang <ha...@gmail.com>.
I understand.  thanks


On Fri, Mar 28, 2014 at 4:10 AM, Mayur Rustagi <ma...@gmail.com>wrote:

> You have to raise the global limit as root. Also you have to do that on
> the whole cluster.
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Thu, Mar 27, 2014 at 4:07 AM, Hahn Jiang <ha...@gmail.com>wrote:
>
>> I set "ulimit -n 100000" in conf/spark-env.sh, is it too small?
>>
>>
>> On Thu, Mar 27, 2014 at 3:36 PM, Sonal Goyal <so...@gmail.com>wrote:
>>
>>> Hi Hahn,
>>>
>>> What's the ulimit on your systems? Please check the following link for a
>>> discussion on the too many files open.
>>>
>>>
>>> http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCANGvG8qpn_WLLsRcJEGDB7HMza2uX7mYxZhfvTZ+b-sDxdKRUg@mail.gmail.com%3E
>>>
>>>
>>> Sent from my iPad
>>>
>>> > On Mar 27, 2014, at 12:15 PM, Hahn Jiang <ha...@gmail.com>
>>> wrote:
>>> >
>>> > Hi, all
>>> >
>>> > I write a spark program on yarn. When I use small size input file, my
>>> program can run well. But my job will failed if input size is more than 40G.
>>> >
>>> > the error log:
>>> > java.io.FileNotFoundException (java.io.FileNotFoundException:
>>> /home/work/data12/yarn/nodemanager/usercache/appcache/application_1392894597330_86813/spark-local-20140327144433-716b/24/shuffle_0_22_890
>>> (Too many open files))
>>> > java.io.FileOutputStream.openAppend(Native Method)
>>> > java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>>> >
>>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
>>> >
>>> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
>>> >
>>> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
>>> >
>>> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
>>> > scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> >
>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.foreach(ExternalAppendOnlyMap.scala:239)
>>> >
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
>>> >
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
>>> > org.apache.spark.scheduler.Task.run(Task.scala:53)
>>> >
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>>> >
>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>> >
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>> >
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>> > java.lang.Thread.run(Thread.java:662)
>>> >
>>> >
>>> > my object:
>>> > object Test {
>>> >
>>> >   def main(args: Array[String]) {
>>> >     val sc = new SparkContext(args(0), "Test",
>>> >       System.getenv("SPARK_HOME"),
>>> SparkContext.jarOfClass(this.getClass))
>>> >
>>> >     val mg = sc.textFile("/user/.../part-*")
>>> >     val mct = sc.textFile("/user/.../part-*")
>>> >
>>> >     val pair1 = mg.map {
>>> >       s =>
>>> >         val cols = s.split("\t")
>>> >         (cols(0), cols(1))
>>> >     }
>>> >     val pair2 = mct.map {
>>> >       s =>
>>> >         val cols = s.split("\t")
>>> >         (cols(0), cols(1))
>>> >     }
>>> >     val merge = pair1.union(pair2)
>>> >     val result = merge.reduceByKey(_ + _)
>>> >     val outputPath = new Path("/user/xxx/temp/spark-output")
>>> >     outputPath.getFileSystem(new Configuration()).delete(outputPath,
>>> true)
>>> >     result.saveAsTextFile(outputPath.toString)
>>> >
>>> >     System.exit(0)
>>> >   }
>>> >
>>> > }
>>> >
>>> > My spark version is 0.9 and I run my job use this command
>>> "/opt/soft/spark/bin/spark-class org.apache.spark.deploy.yarn.Client --jar
>>> ./spark-example_2.10-0.1-SNAPSHOT.jar --class Test --queue default --args
>>> yarn-standalone --num-workers 500 --master-memory 7g --worker-memory 7g
>>> --worker-cores 2"
>>> >
>>>
>>
>>
>

Re:

Posted by Mayur Rustagi <ma...@gmail.com>.
You have to raise the global limit as root. Also you have to do that on the
whole cluster.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Thu, Mar 27, 2014 at 4:07 AM, Hahn Jiang <ha...@gmail.com>wrote:

> I set "ulimit -n 100000" in conf/spark-env.sh, is it too small?
>
>
> On Thu, Mar 27, 2014 at 3:36 PM, Sonal Goyal <so...@gmail.com>wrote:
>
>> Hi Hahn,
>>
>> What's the ulimit on your systems? Please check the following link for a
>> discussion on the too many files open.
>>
>>
>> http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCANGvG8qpn_WLLsRcJEGDB7HMza2uX7mYxZhfvTZ+b-sDxdKRUg@mail.gmail.com%3E
>>
>>
>> Sent from my iPad
>>
>> > On Mar 27, 2014, at 12:15 PM, Hahn Jiang <ha...@gmail.com>
>> wrote:
>> >
>> > Hi, all
>> >
>> > I write a spark program on yarn. When I use small size input file, my
>> program can run well. But my job will failed if input size is more than 40G.
>> >
>> > the error log:
>> > java.io.FileNotFoundException (java.io.FileNotFoundException:
>> /home/work/data12/yarn/nodemanager/usercache/appcache/application_1392894597330_86813/spark-local-20140327144433-716b/24/shuffle_0_22_890
>> (Too many open files))
>> > java.io.FileOutputStream.openAppend(Native Method)
>> > java.io.FileOutputStream.<init>(FileOutputStream.java:192)
>> >
>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
>> >
>> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
>> >
>> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
>> >
>> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
>> > scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >
>> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.foreach(ExternalAppendOnlyMap.scala:239)
>> >
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
>> >
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
>> > org.apache.spark.scheduler.Task.run(Task.scala:53)
>> >
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>> >
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> > java.lang.Thread.run(Thread.java:662)
>> >
>> >
>> > my object:
>> > object Test {
>> >
>> >   def main(args: Array[String]) {
>> >     val sc = new SparkContext(args(0), "Test",
>> >       System.getenv("SPARK_HOME"),
>> SparkContext.jarOfClass(this.getClass))
>> >
>> >     val mg = sc.textFile("/user/.../part-*")
>> >     val mct = sc.textFile("/user/.../part-*")
>> >
>> >     val pair1 = mg.map {
>> >       s =>
>> >         val cols = s.split("\t")
>> >         (cols(0), cols(1))
>> >     }
>> >     val pair2 = mct.map {
>> >       s =>
>> >         val cols = s.split("\t")
>> >         (cols(0), cols(1))
>> >     }
>> >     val merge = pair1.union(pair2)
>> >     val result = merge.reduceByKey(_ + _)
>> >     val outputPath = new Path("/user/xxx/temp/spark-output")
>> >     outputPath.getFileSystem(new Configuration()).delete(outputPath,
>> true)
>> >     result.saveAsTextFile(outputPath.toString)
>> >
>> >     System.exit(0)
>> >   }
>> >
>> > }
>> >
>> > My spark version is 0.9 and I run my job use this command
>> "/opt/soft/spark/bin/spark-class org.apache.spark.deploy.yarn.Client --jar
>> ./spark-example_2.10-0.1-SNAPSHOT.jar --class Test --queue default --args
>> yarn-standalone --num-workers 500 --master-memory 7g --worker-memory 7g
>> --worker-cores 2"
>> >
>>
>
>

Re:

Posted by Hahn Jiang <ha...@gmail.com>.
I set "ulimit -n 100000" in conf/spark-env.sh, is it too small?


On Thu, Mar 27, 2014 at 3:36 PM, Sonal Goyal <so...@gmail.com> wrote:

> Hi Hahn,
>
> What's the ulimit on your systems? Please check the following link for a
> discussion on the too many files open.
>
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3CCANGvG8qpn_WLLsRcJEGDB7HMza2uX7mYxZhfvTZ+b-sDxdKRUg@mail.gmail.com%3E
>
>
> Sent from my iPad
>
> > On Mar 27, 2014, at 12:15 PM, Hahn Jiang <ha...@gmail.com>
> wrote:
> >
> > Hi, all
> >
> > I write a spark program on yarn. When I use small size input file, my
> program can run well. But my job will failed if input size is more than 40G.
> >
> > the error log:
> > java.io.FileNotFoundException (java.io.FileNotFoundException:
> /home/work/data12/yarn/nodemanager/usercache/appcache/application_1392894597330_86813/spark-local-20140327144433-716b/24/shuffle_0_22_890
> (Too many open files))
> > java.io.FileOutputStream.openAppend(Native Method)
> > java.io.FileOutputStream.<init>(FileOutputStream.java:192)
> >
> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
> >
> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
> >
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
> >
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
> > scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >
> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.foreach(ExternalAppendOnlyMap.scala:239)
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
> > org.apache.spark.scheduler.Task.run(Task.scala:53)
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > java.lang.Thread.run(Thread.java:662)
> >
> >
> > my object:
> > object Test {
> >
> >   def main(args: Array[String]) {
> >     val sc = new SparkContext(args(0), "Test",
> >       System.getenv("SPARK_HOME"),
> SparkContext.jarOfClass(this.getClass))
> >
> >     val mg = sc.textFile("/user/.../part-*")
> >     val mct = sc.textFile("/user/.../part-*")
> >
> >     val pair1 = mg.map {
> >       s =>
> >         val cols = s.split("\t")
> >         (cols(0), cols(1))
> >     }
> >     val pair2 = mct.map {
> >       s =>
> >         val cols = s.split("\t")
> >         (cols(0), cols(1))
> >     }
> >     val merge = pair1.union(pair2)
> >     val result = merge.reduceByKey(_ + _)
> >     val outputPath = new Path("/user/xxx/temp/spark-output")
> >     outputPath.getFileSystem(new Configuration()).delete(outputPath,
> true)
> >     result.saveAsTextFile(outputPath.toString)
> >
> >     System.exit(0)
> >   }
> >
> > }
> >
> > My spark version is 0.9 and I run my job use this command
> "/opt/soft/spark/bin/spark-class org.apache.spark.deploy.yarn.Client --jar
> ./spark-example_2.10-0.1-SNAPSHOT.jar --class Test --queue default --args
> yarn-standalone --num-workers 500 --master-memory 7g --worker-memory 7g
> --worker-cores 2"
> >
>