You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ja...@centrum.cz on 2014/11/02 10:35:10 UTC

Spark on Yarn probably trying to load all the data to RAM

Hi,

I am using Spark on Yarn, particularly Spark in Python. I am trying to run:

myrdd = sc.textFile("s3n://mybucket/files/*/*/*.json")
myrdd.getNumPartitions()

Unfortunately it seems that Spark tries to load everything to RAM, or at least after while of running this everything slows down and then I am getting errors with log below. Everything works fine for datasets smaller than RAM, but I would expect Spark doing this without storing everything to RAM. So I would like to ask if I'm not missing some settings in Spark on Yarn?


Thank you in advance for any help.


14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
11744,575: [Full GC 1194515K->1192839K(1365504K), 2,2367150 secs]
11746,814: [Full GC 1194507K->1193186K(1365504K), 2,1788150 secs]
11748,995: [Full GC 1194507K->1193278K(1365504K), 1,3511480 secs]
11750,347: [Full GC 1194507K->1193263K(1365504K), 2,2735350 secs]
11752,622: [Full GC 1194506K->1193192K(1365504K), 1,2700110 secs]
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/hadoop/spark/python/pyspark/rdd.py", line 391, in getNumPartitions
    return self._jrdd.partitions().size()
  File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2
: An error occurred while calling o112.partitions.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
 
>>> 11753,896: [Full GC 1194506K->947839K(1365504K), 2,1483780 secs]
14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
14/11/01 22:07:09 INFO Remoting: Remoting shut down
14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@5ca1c790
14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@5ca1c790
java.nio.channels.CancelledKeyException
 at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
 at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not found
14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn application already ended: FINISHED
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null}
14/11/01 22:07:10 INFO ui.SparkUI: Stopped Spark web UI at http://ip-172-31-20-69.us-west-2.compute.internal:4040
14/11/01 22:07:10 INFO scheduler.DAGScheduler: Stopping DAGScheduler
14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Stopped
14/11/01 22:07:11 ERROR spark.MapOutputTrackerMaster: Error communicating with MapOutputTracker
akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had already been terminated.
 at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
 at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
 at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
 at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
 at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
 at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
 at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
Exception in thread "Yarn Application State Checker" org.apache.spark.SparkException: Error communicating with MapOutputTracker
 at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:111)
 at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
 at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
 at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
 at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
 at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had already been terminated.
 at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
 at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
 ... 5 more 
 


Re: Spark on Yarn probably trying to load all the data to RAM

Posted by ja...@centrum.cz.
Could you please give me an example or send me a link of how to use Hadoop CombinedFileInputFormat? It sound very interesting to me and it would probably save me several hours of my pipeline computation. Merging of the files is currently the bottleneck in my system.
______________________________________________________________


Another potential option could be to use Hadoop CombinedFileInputFormat with input split size of say 512 MB or 1 GB. That way you don't need to have a preceding step and I/O of first combining the files together.
On Nov 5, 2014 8:23 AM, <jan.zikes@centrum.cz <ja...@centrum.cz>> wrote:
Ok so the problem was solved, it that the file was gziped and it looks that Spark does not support direct .gz file distribution to workers. 

Thank you very much fro the suggestion to merge the files.

Best regards,
Jan 
______________________________________________________________


I have tried it out to merge the file to one, Spark is now working with RAM as I've expected.

Unfortunately after doing this there appears another problem. Now Spark running on YARN is scheduling all the work only to one worker node as a one big job. Is there some way, how to force Spark and Yarn to schedule all the work uniformly across the whole cluster?
 
I am running job from the following command:
./spark/bin/spark-submit --master yarn-client --py-files /home/hadoop/my_pavkage.zip  /home/hadoop/preprocessor.py 
 
I have also tried to play with options --num-executors and --executor-cores. But unfortunately I am not able to force Spark to run jobs on more than just one cluster node.
 
Thank you in advance for any advice,
Best regards,
Jan 
______________________________________________________________
This is a crazy cases that has a few millions of files, the scheduler will run
out of memory. Be default, each file will become a partition, so you will
have more than 1M partitions, also 1M tasks. With coalesce(), it will reduce
the number of tasks, but can not reduce the number of partitions of original
RDD.

Could you pack the small files int bigger ones? Spark works much better than
small files.

On Mon, Nov 3, 2014 at 11:46 AM,  <jan.zikes@centrum.cz <ja...@centrum.cz>> wrote:
> I have 3 datasets in all the datasets the average file size is 10-12Kb.
> I am able to run my code on the dataset with 70K files, but I am not able to
> run it on datasets with 1.1M and 3.8M files.
>
> ______________________________________________________________
>
> On Sun, Nov 2, 2014 at 1:35 AM,  <jan.zikes@centrum.cz <ja...@centrum.cz>> wrote:
>> Hi,
>>
>> I am using Spark on Yarn, particularly Spark in Python. I am trying to
>> run:
>>
>> myrdd = sc.textFile("s3n://mybucket/files/*/*/*.json")
>
> How many files do you have? and the average size of each file?
>
>> myrdd.getNumPartitions()
>>
>> Unfortunately it seems that Spark tries to load everything to RAM, or at
>> least after while of running this everything slows down and then I am
>> getting errors with log below. Everything works fine for datasets smaller
>> than RAM, but I would expect Spark doing this without storing everything
>> to
>> RAM. So I would like to ask if I'm not missing some settings in Spark on
>> Yarn?
>>
>>
>> Thank you in advance for any help.
>>
>>
>> 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 11744,575: [Full GC 1194515K->1192839K(1365504K), 2,2367150 secs]
>>
>> 11746,814: [Full GC 1194507K->1193186K(1365504K), 2,1788150 secs]
>>
>> 11748,995: [Full GC 1194507K->1193278K(1365504K), 1,3511480 secs]
>>
>> 11750,347: [Full GC 1194507K->1193263K(1365504K), 2,2735350 secs]
>>
>> 11752,622: [Full GC 1194506K->1193192K(1365504K), 1,2700110 secs]
>>
>> Traceback (most recent call last):
>>
>>   File "<stdin>", line 1, in <module>
>>
>>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 391, in
>> getNumPartitions
>>
>>     return self._jrdd.partitions().size()
>>
>>   File
>> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>
>>   File
>> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line
>> 300, in get_return_value
>>
>> py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler:
>> Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2
>>
>> : An error occurred while calling o112.partitions.
>>
>> : java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>>
>>
>>>>> 11753,896: [Full GC 1194506K->947839K(1365504K), 2,1483780 secs]
>>
>> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
>> Shutting down remote daemon.
>>
>> 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
>> Remote daemon shut down; proceeding with flushing remote transports.
>>
>> 14/11/01 22:07:09 INFO Remoting: Remoting shut down
>>
>> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
>> Remoting shut down.
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> ReceivingConnection to
>> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> SendingConnection
>> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> SendingConnection
>> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ?
>> sun.nio.ch.SelectionKeyImpl@5ca1c790
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ?
>> sun.nio.ch.SelectionKeyImpl@5ca1c790
>>
>> java.nio.channels.CancelledKeyException
>>
>> at
>>
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
>>
>> at
>>
>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> SendingConnection
>> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> ReceivingConnection to
>> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
>>
>> 14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding
>> SendingConnection to
>> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not
>> found
>>
>> 14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn
>> application
>> already ended: FINISHED
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/metrics/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/static,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/executors/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/executors,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/environment/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/environment,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage/rdd,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/pool/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/pool,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/stage/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/stage,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages,null}
>>
>> 14/11/01 22:07:10 INFO ui.SparkUI: Stopped Spark web UI at
>> http://ip-172-31-20-69.us-west-2.compute.internal:4040 <http://ip-172-31-20-69.us-west-2.compute.internal:4040>
>>
>> 14/11/01 22:07:10 INFO scheduler.DAGScheduler: Stopping DAGScheduler
>>
>> 14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Shutting down
>> all
>> executors
>>
>> 14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Stopped
>>
>> 14/11/01 22:07:11 ERROR spark.MapOutputTrackerMaster: Error communicating
>> with MapOutputTracker
>>
>> akka.pattern.AskTimeoutException:
>> Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had
>> already been terminated.
>>
>> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
>>
>> at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
>>
>> at
>> org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
>>
>> at
>> org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
>>
>> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
>>
>> at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
>>
>> at
>>
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
>>
>> Exception in thread "Yarn Application State Checker"
>> org.apache.spark.SparkException: Error communicating with MapOutputTracker
>>
>> at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:111)
>>
>> at
>> org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
>>
>> at
>> org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
>>
>> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
>>
>> at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
>>
>> at
>>
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
>>
>> Caused by: akka.pattern.AskTimeoutException:
>> Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had
>> already been terminated.
>>
>> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
>>
>> at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
>>
>> ... 5 more
>>
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <us...@spark.apache.org>
>> For additional commands, e-mail: user-help@spark.apache.org <us...@spark.apache.org>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <us...@spark.apache.org>
> For additional commands, e-mail: user-help@spark.apache.org <us...@spark.apache.org>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <us...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org <us...@spark.apache.org>




----------


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <us...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org <us...@spark.apache.org>


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <us...@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org <us...@spark.apache.org>

Re: Spark on Yarn probably trying to load all the data to RAM

Posted by ja...@centrum.cz.
Ok so the problem was solved, it that the file was gziped and it looks that Spark does not support direct .gz file distribution to workers. 

Thank you very much fro the suggestion to merge the files.

Best regards,
Jan 
______________________________________________________________


I have tried it out to merge the file to one, Spark is now working with RAM as I've expected.

Unfortunately after doing this there appears another problem. Now Spark running on YARN is scheduling all the work only to one worker node as a one big job. Is there some way, how to force Spark and Yarn to schedule all the work uniformly across the whole cluster?
 
I am running job from the following command:
./spark/bin/spark-submit --master yarn-client --py-files /home/hadoop/my_pavkage.zip  /home/hadoop/preprocessor.py 
 
I have also tried to play with options --num-executors and --executor-cores. But unfortunately I am not able to force Spark to run jobs on more than just one cluster node.
 
Thank you in advance for any advice,
Best regards,
Jan 
______________________________________________________________
This is a crazy cases that has a few millions of files, the scheduler will run
out of memory. Be default, each file will become a partition, so you will
have more than 1M partitions, also 1M tasks. With coalesce(), it will reduce
the number of tasks, but can not reduce the number of partitions of original
RDD.

Could you pack the small files int bigger ones? Spark works much better than
small files.

On Mon, Nov 3, 2014 at 11:46 AM,  <ja...@centrum.cz> wrote:
> I have 3 datasets in all the datasets the average file size is 10-12Kb.
> I am able to run my code on the dataset with 70K files, but I am not able to
> run it on datasets with 1.1M and 3.8M files.
>
> ______________________________________________________________
>
> On Sun, Nov 2, 2014 at 1:35 AM,  <ja...@centrum.cz> wrote:
>> Hi,
>>
>> I am using Spark on Yarn, particularly Spark in Python. I am trying to
>> run:
>>
>> myrdd = sc.textFile("s3n://mybucket/files/*/*/*.json")
>
> How many files do you have? and the average size of each file?
>
>> myrdd.getNumPartitions()
>>
>> Unfortunately it seems that Spark tries to load everything to RAM, or at
>> least after while of running this everything slows down and then I am
>> getting errors with log below. Everything works fine for datasets smaller
>> than RAM, but I would expect Spark doing this without storing everything
>> to
>> RAM. So I would like to ask if I'm not missing some settings in Spark on
>> Yarn?
>>
>>
>> Thank you in advance for any help.
>>
>>
>> 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 11744,575: [Full GC 1194515K->1192839K(1365504K), 2,2367150 secs]
>>
>> 11746,814: [Full GC 1194507K->1193186K(1365504K), 2,1788150 secs]
>>
>> 11748,995: [Full GC 1194507K->1193278K(1365504K), 1,3511480 secs]
>>
>> 11750,347: [Full GC 1194507K->1193263K(1365504K), 2,2735350 secs]
>>
>> 11752,622: [Full GC 1194506K->1193192K(1365504K), 1,2700110 secs]
>>
>> Traceback (most recent call last):
>>
>>   File "<stdin>", line 1, in <module>
>>
>>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 391, in
>> getNumPartitions
>>
>>     return self._jrdd.partitions().size()
>>
>>   File
>> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>
>>   File
>> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line
>> 300, in get_return_value
>>
>> py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler:
>> Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2
>>
>> : An error occurred while calling o112.partitions.
>>
>> : java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>>
>>
>>>>> 11753,896: [Full GC 1194506K->947839K(1365504K), 2,1483780 secs]
>>
>> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
>> Shutting down remote daemon.
>>
>> 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
>> Remote daemon shut down; proceeding with flushing remote transports.
>>
>> 14/11/01 22:07:09 INFO Remoting: Remoting shut down
>>
>> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
>> Remoting shut down.
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> ReceivingConnection to
>> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> SendingConnection
>> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> SendingConnection
>> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ?
>> sun.nio.ch.SelectionKeyImpl@5ca1c790
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ?
>> sun.nio.ch.SelectionKeyImpl@5ca1c790
>>
>> java.nio.channels.CancelledKeyException
>>
>> at
>>
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
>>
>> at
>>
>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> SendingConnection
>> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> ReceivingConnection to
>> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
>>
>> 14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding
>> SendingConnection to
>> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not
>> found
>>
>> 14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn
>> application
>> already ended: FINISHED
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/metrics/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/static,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/executors/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/executors,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/environment/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/environment,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage/rdd,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/pool/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/pool,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/stage/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/stage,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages,null}
>>
>> 14/11/01 22:07:10 INFO ui.SparkUI: Stopped Spark web UI at
>> http://ip-172-31-20-69.us-west-2.compute.internal:4040 <http://ip-172-31-20-69.us-west-2.compute.internal:4040>
>>
>> 14/11/01 22:07:10 INFO scheduler.DAGScheduler: Stopping DAGScheduler
>>
>> 14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Shutting down
>> all
>> executors
>>
>> 14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Stopped
>>
>> 14/11/01 22:07:11 ERROR spark.MapOutputTrackerMaster: Error communicating
>> with MapOutputTracker
>>
>> akka.pattern.AskTimeoutException:
>> Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had
>> already been terminated.
>>
>> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
>>
>> at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
>>
>> at
>> org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
>>
>> at
>> org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
>>
>> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
>>
>> at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
>>
>> at
>>
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
>>
>> Exception in thread "Yarn Application State Checker"
>> org.apache.spark.SparkException: Error communicating with MapOutputTracker
>>
>> at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:111)
>>
>> at
>> org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
>>
>> at
>> org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
>>
>> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
>>
>> at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
>>
>> at
>>
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
>>
>> Caused by: akka.pattern.AskTimeoutException:
>> Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had
>> already been terminated.
>>
>> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
>>
>> at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
>>
>> ... 5 more
>>
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org




----------


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark on Yarn probably trying to load all the data to RAM

Posted by ja...@centrum.cz.
I have tried it out to merge the file to one, Spark is now working with RAM as I've expected.

Unfortunately after doing this there appears another problem. Now Spark running on YARN is scheduling all the work only to one worker node as a one big job. Is there some way, how to force Spark and Yarn to schedule all the work uniformly across the whole cluster?
 
I am running job from the following command:
./spark/bin/spark-submit --master yarn-client --py-files /home/hadoop/my_pavkage.zip  /home/hadoop/preprocessor.py 
 
I have also tried to play with options --num-executors and --executor-cores. But unfortunately I am not able to force Spark to run jobs on more than just one cluster node.
 
Thank you in advance for any advice,
Best regards,
Jan 
______________________________________________________________
This is a crazy cases that has a few millions of files, the scheduler will run
out of memory. Be default, each file will become a partition, so you will
have more than 1M partitions, also 1M tasks. With coalesce(), it will reduce
the number of tasks, but can not reduce the number of partitions of original
RDD.

Could you pack the small files int bigger ones? Spark works much better than
small files.

On Mon, Nov 3, 2014 at 11:46 AM,  <ja...@centrum.cz> wrote:
> I have 3 datasets in all the datasets the average file size is 10-12Kb.
> I am able to run my code on the dataset with 70K files, but I am not able to
> run it on datasets with 1.1M and 3.8M files.
>
> ______________________________________________________________
>
> On Sun, Nov 2, 2014 at 1:35 AM,  <ja...@centrum.cz> wrote:
>> Hi,
>>
>> I am using Spark on Yarn, particularly Spark in Python. I am trying to
>> run:
>>
>> myrdd = sc.textFile("s3n://mybucket/files/*/*/*.json")
>
> How many files do you have? and the average size of each file?
>
>> myrdd.getNumPartitions()
>>
>> Unfortunately it seems that Spark tries to load everything to RAM, or at
>> least after while of running this everything slows down and then I am
>> getting errors with log below. Everything works fine for datasets smaller
>> than RAM, but I would expect Spark doing this without storing everything
>> to
>> RAM. So I would like to ask if I'm not missing some settings in Spark on
>> Yarn?
>>
>>
>> Thank you in advance for any help.
>>
>>
>> 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 11744,575: [Full GC 1194515K->1192839K(1365504K), 2,2367150 secs]
>>
>> 11746,814: [Full GC 1194507K->1193186K(1365504K), 2,1788150 secs]
>>
>> 11748,995: [Full GC 1194507K->1193278K(1365504K), 1,3511480 secs]
>>
>> 11750,347: [Full GC 1194507K->1193263K(1365504K), 2,2735350 secs]
>>
>> 11752,622: [Full GC 1194506K->1193192K(1365504K), 1,2700110 secs]
>>
>> Traceback (most recent call last):
>>
>>   File "<stdin>", line 1, in <module>
>>
>>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 391, in
>> getNumPartitions
>>
>>     return self._jrdd.partitions().size()
>>
>>   File
>> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>
>>   File
>> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line
>> 300, in get_return_value
>>
>> py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler:
>> Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2
>>
>> : An error occurred while calling o112.partitions.
>>
>> : java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>>
>>
>>>>> 11753,896: [Full GC 1194506K->947839K(1365504K), 2,1483780 secs]
>>
>> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
>> Shutting down remote daemon.
>>
>> 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
>> Remote daemon shut down; proceeding with flushing remote transports.
>>
>> 14/11/01 22:07:09 INFO Remoting: Remoting shut down
>>
>> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
>> Remoting shut down.
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> ReceivingConnection to
>> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> SendingConnection
>> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> SendingConnection
>> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ?
>> sun.nio.ch.SelectionKeyImpl@5ca1c790
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ?
>> sun.nio.ch.SelectionKeyImpl@5ca1c790
>>
>> java.nio.channels.CancelledKeyException
>>
>> at
>>
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
>>
>> at
>>
>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> SendingConnection
>> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> ReceivingConnection to
>> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
>>
>> 14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding
>> SendingConnection to
>> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not
>> found
>>
>> 14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn
>> application
>> already ended: FINISHED
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/metrics/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/static,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/executors/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/executors,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/environment/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/environment,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage/rdd,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/pool/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/pool,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/stage/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/stage,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages,null}
>>
>> 14/11/01 22:07:10 INFO ui.SparkUI: Stopped Spark web UI at
>> http://ip-172-31-20-69.us-west-2.compute.internal:4040 <http://ip-172-31-20-69.us-west-2.compute.internal:4040>
>>
>> 14/11/01 22:07:10 INFO scheduler.DAGScheduler: Stopping DAGScheduler
>>
>> 14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Shutting down
>> all
>> executors
>>
>> 14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Stopped
>>
>> 14/11/01 22:07:11 ERROR spark.MapOutputTrackerMaster: Error communicating
>> with MapOutputTracker
>>
>> akka.pattern.AskTimeoutException:
>> Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had
>> already been terminated.
>>
>> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
>>
>> at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
>>
>> at
>> org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
>>
>> at
>> org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
>>
>> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
>>
>> at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
>>
>> at
>>
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
>>
>> Exception in thread "Yarn Application State Checker"
>> org.apache.spark.SparkException: Error communicating with MapOutputTracker
>>
>> at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:111)
>>
>> at
>> org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
>>
>> at
>> org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
>>
>> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
>>
>> at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
>>
>> at
>>
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
>>
>> Caused by: akka.pattern.AskTimeoutException:
>> Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had
>> already been terminated.
>>
>> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
>>
>> at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
>>
>> ... 5 more
>>
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org




Re: Spark on Yarn probably trying to load all the data to RAM

Posted by Davies Liu <da...@databricks.com>.
This is a crazy cases that has a few millions of files, the scheduler will run
out of memory. Be default, each file will become a partition, so you will
have more than 1M partitions, also 1M tasks. With coalesce(), it will reduce
the number of tasks, but can not reduce the number of partitions of original
RDD.

Could you pack the small files int bigger ones? Spark works much better than
small files.

On Mon, Nov 3, 2014 at 11:46 AM,  <ja...@centrum.cz> wrote:
> I have 3 datasets in all the datasets the average file size is 10-12Kb.
> I am able to run my code on the dataset with 70K files, but I am not able to
> run it on datasets with 1.1M and 3.8M files.
>
> ______________________________________________________________
>
> On Sun, Nov 2, 2014 at 1:35 AM,  <ja...@centrum.cz> wrote:
>> Hi,
>>
>> I am using Spark on Yarn, particularly Spark in Python. I am trying to
>> run:
>>
>> myrdd = sc.textFile("s3n://mybucket/files/*/*/*.json")
>
> How many files do you have? and the average size of each file?
>
>> myrdd.getNumPartitions()
>>
>> Unfortunately it seems that Spark tries to load everything to RAM, or at
>> least after while of running this everything slows down and then I am
>> getting errors with log below. Everything works fine for datasets smaller
>> than RAM, but I would expect Spark doing this without storing everything
>> to
>> RAM. So I would like to ask if I'm not missing some settings in Spark on
>> Yarn?
>>
>>
>> Thank you in advance for any help.
>>
>>
>> 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 11744,575: [Full GC 1194515K->1192839K(1365504K), 2,2367150 secs]
>>
>> 11746,814: [Full GC 1194507K->1193186K(1365504K), 2,1788150 secs]
>>
>> 11748,995: [Full GC 1194507K->1193278K(1365504K), 1,3511480 secs]
>>
>> 11750,347: [Full GC 1194507K->1193263K(1365504K), 2,2735350 secs]
>>
>> 11752,622: [Full GC 1194506K->1193192K(1365504K), 1,2700110 secs]
>>
>> Traceback (most recent call last):
>>
>>   File "<stdin>", line 1, in <module>
>>
>>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 391, in
>> getNumPartitions
>>
>>     return self._jrdd.partitions().size()
>>
>>   File
>> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>
>>   File
>> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> line
>> 300, in get_return_value
>>
>> py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler:
>> Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2
>>
>> : An error occurred while calling o112.partitions.
>>
>> : java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>>
>>
>>>>> 11753,896: [Full GC 1194506K->947839K(1365504K), 2,1483780 secs]
>>
>> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
>> Shutting down remote daemon.
>>
>> 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>
>> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
>> Remote daemon shut down; proceeding with flushing remote transports.
>>
>> 14/11/01 22:07:09 INFO Remoting: Remoting shut down
>>
>> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
>> Remoting shut down.
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> ReceivingConnection to
>> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> SendingConnection
>> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> SendingConnection
>> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ?
>> sun.nio.ch.SelectionKeyImpl@5ca1c790
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ?
>> sun.nio.ch.SelectionKeyImpl@5ca1c790
>>
>> java.nio.channels.CancelledKeyException
>>
>> at
>>
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
>>
>> at
>>
>> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> SendingConnection
>> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
>>
>> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
>> ReceivingConnection to
>> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
>>
>> 14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding
>> SendingConnection to
>> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not
>> found
>>
>> 14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn
>> application
>> already ended: FINISHED
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/metrics/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/static,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/executors/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/executors,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/environment/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/environment,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage/rdd,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/storage,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/pool/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/pool,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/stage/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/stage,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages/json,null}
>>
>> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
>> o.e.j.s.ServletContextHandler{/stages,null}
>>
>> 14/11/01 22:07:10 INFO ui.SparkUI: Stopped Spark web UI at
>> http://ip-172-31-20-69.us-west-2.compute.internal:4040
>>
>> 14/11/01 22:07:10 INFO scheduler.DAGScheduler: Stopping DAGScheduler
>>
>> 14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Shutting down
>> all
>> executors
>>
>> 14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Stopped
>>
>> 14/11/01 22:07:11 ERROR spark.MapOutputTrackerMaster: Error communicating
>> with MapOutputTracker
>>
>> akka.pattern.AskTimeoutException:
>> Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had
>> already been terminated.
>>
>> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
>>
>> at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
>>
>> at
>> org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
>>
>> at
>> org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
>>
>> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
>>
>> at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
>>
>> at
>>
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
>>
>> Exception in thread "Yarn Application State Checker"
>> org.apache.spark.SparkException: Error communicating with MapOutputTracker
>>
>> at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:111)
>>
>> at
>> org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
>>
>> at
>> org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
>>
>> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
>>
>> at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
>>
>> at
>>
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
>>
>> Caused by: akka.pattern.AskTimeoutException:
>> Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had
>> already been terminated.
>>
>> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
>>
>> at
>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
>>
>> ... 5 more
>>
>>
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark on Yarn probably trying to load all the data to RAM

Posted by ja...@centrum.cz.
I have 3 datasets in all the datasets the average file size is 10-12Kb. 
I am able to run my code on the dataset with 70K files, but I am not able to run it on datasets with 1.1M and 3.8M files. 
______________________________________________________________


On Sun, Nov 2, 2014 at 1:35 AM,  <ja...@centrum.cz> wrote:
> Hi,
>
> I am using Spark on Yarn, particularly Spark in Python. I am trying to run:
>
> myrdd = sc.textFile("s3n://mybucket/files/*/*/*.json")

How many files do you have? and the average size of each file?

> myrdd.getNumPartitions()
>
> Unfortunately it seems that Spark tries to load everything to RAM, or at
> least after while of running this everything slows down and then I am
> getting errors with log below. Everything works fine for datasets smaller
> than RAM, but I would expect Spark doing this without storing everything to
> RAM. So I would like to ask if I'm not missing some settings in Spark on
> Yarn?
>
>
> Thank you in advance for any help.
>
>
> 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
> thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down
> ActorSystem [sparkDriver]
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
> thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
> ActorSystem [sparkDriver]
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> 11744,575: [Full GC 1194515K->1192839K(1365504K), 2,2367150 secs]
>
> 11746,814: [Full GC 1194507K->1193186K(1365504K), 2,1788150 secs]
>
> 11748,995: [Full GC 1194507K->1193278K(1365504K), 1,3511480 secs]
>
> 11750,347: [Full GC 1194507K->1193263K(1365504K), 2,2735350 secs]
>
> 11752,622: [Full GC 1194506K->1193192K(1365504K), 1,2700110 secs]
>
> Traceback (most recent call last):
>
>   File "<stdin>", line 1, in <module>
>
>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 391, in
> getNumPartitions
>
>     return self._jrdd.partitions().size()
>
>   File
> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>
>   File
> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
>
> py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler:
> Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2
>
> : An error occurred while calling o112.partitions.
>
> : java.lang.OutOfMemoryError: GC overhead limit exceeded
>
>
>
>>>> 11753,896: [Full GC 1194506K->947839K(1365504K), 2,1483780 secs]
>
> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Shutting down remote daemon.
>
> 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
> thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
> ActorSystem [sparkDriver]
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
> thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down
> ActorSystem [sparkDriver]
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Remote daemon shut down; proceeding with flushing remote transports.
>
> 14/11/01 22:07:09 INFO Remoting: Remoting shut down
>
> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Remoting shut down.
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
> ReceivingConnection to
> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ?
> sun.nio.ch.SelectionKeyImpl@5ca1c790
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ?
> sun.nio.ch.SelectionKeyImpl@5ca1c790
>
> java.nio.channels.CancelledKeyException
>
> at
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
>
> at
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
> ReceivingConnection to
> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
>
> 14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding
> SendingConnection to
> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not
> found
>
> 14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn application
> already ended: FINISHED
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/metrics/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/static,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/executors/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/executors,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/environment/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/environment,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/storage/rdd,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/storage/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/storage,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/pool/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/pool,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/stage/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/stage,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages,null}
>
> 14/11/01 22:07:10 INFO ui.SparkUI: Stopped Spark web UI at
> http://ip-172-31-20-69.us-west-2.compute.internal:4040 <http://ip-172-31-20-69.us-west-2.compute.internal:4040>
>
> 14/11/01 22:07:10 INFO scheduler.DAGScheduler: Stopping DAGScheduler
>
> 14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Shutting down all
> executors
>
> 14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Stopped
>
> 14/11/01 22:07:11 ERROR spark.MapOutputTrackerMaster: Error communicating
> with MapOutputTracker
>
> akka.pattern.AskTimeoutException:
> Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had
> already been terminated.
>
> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
>
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
>
> at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
>
> at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
>
> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
>
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
>
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
>
> Exception in thread "Yarn Application State Checker"
> org.apache.spark.SparkException: Error communicating with MapOutputTracker
>
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:111)
>
> at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
>
> at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
>
> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
>
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
>
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
>
> Caused by: akka.pattern.AskTimeoutException:
> Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had
> already been terminated.
>
> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
>
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
>
> ... 5 more
>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org




Re: Spark on Yarn probably trying to load all the data to RAM

Posted by Davies Liu <da...@databricks.com>.
On Sun, Nov 2, 2014 at 1:35 AM,  <ja...@centrum.cz> wrote:
> Hi,
>
> I am using Spark on Yarn, particularly Spark in Python. I am trying to run:
>
> myrdd = sc.textFile("s3n://mybucket/files/*/*/*.json")

How many files do you have? and the average size of each file?

> myrdd.getNumPartitions()
>
> Unfortunately it seems that Spark tries to load everything to RAM, or at
> least after while of running this everything slows down and then I am
> getting errors with log below. Everything works fine for datasets smaller
> than RAM, but I would expect Spark doing this without storing everything to
> RAM. So I would like to ask if I'm not missing some settings in Spark on
> Yarn?
>
>
> Thank you in advance for any help.
>
>
> 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
> thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down
> ActorSystem [sparkDriver]
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
> thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
> ActorSystem [sparkDriver]
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> 11744,575: [Full GC 1194515K->1192839K(1365504K), 2,2367150 secs]
>
> 11746,814: [Full GC 1194507K->1193186K(1365504K), 2,1788150 secs]
>
> 11748,995: [Full GC 1194507K->1193278K(1365504K), 1,3511480 secs]
>
> 11750,347: [Full GC 1194507K->1193263K(1365504K), 2,2735350 secs]
>
> 11752,622: [Full GC 1194506K->1193192K(1365504K), 1,2700110 secs]
>
> Traceback (most recent call last):
>
>   File "<stdin>", line 1, in <module>
>
>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 391, in
> getNumPartitions
>
>     return self._jrdd.partitions().size()
>
>   File
> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>
>   File
> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
>
> py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler:
> Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2
>
> : An error occurred while calling o112.partitions.
>
> : java.lang.OutOfMemoryError: GC overhead limit exceeded
>
>
>
>>>> 11753,896: [Full GC 1194506K->947839K(1365504K), 2,1483780 secs]
>
> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Shutting down remote daemon.
>
> 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
> thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
> ActorSystem [sparkDriver]
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
> thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down
> ActorSystem [sparkDriver]
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Remote daemon shut down; proceeding with flushing remote transports.
>
> 14/11/01 22:07:09 INFO Remoting: Remoting shut down
>
> 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
> Remoting shut down.
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
> ReceivingConnection to
> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ?
> sun.nio.ch.SelectionKeyImpl@5ca1c790
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ?
> sun.nio.ch.SelectionKeyImpl@5ca1c790
>
> java.nio.channels.CancelledKeyException
>
> at
> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
>
> at
> org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection
> to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
>
> 14/11/01 22:07:09 INFO network.ConnectionManager: Removing
> ReceivingConnection to
> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
>
> 14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding
> SendingConnection to
> ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not
> found
>
> 14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn application
> already ended: FINISHED
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/metrics/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/static,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/executors/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/executors,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/environment/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/environment,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/storage/rdd,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/storage/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/storage,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/pool/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/pool,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/stage/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/stage,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages/json,null}
>
> 14/11/01 22:07:10 INFO handler.ContextHandler: stopped
> o.e.j.s.ServletContextHandler{/stages,null}
>
> 14/11/01 22:07:10 INFO ui.SparkUI: Stopped Spark web UI at
> http://ip-172-31-20-69.us-west-2.compute.internal:4040
>
> 14/11/01 22:07:10 INFO scheduler.DAGScheduler: Stopping DAGScheduler
>
> 14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Shutting down all
> executors
>
> 14/11/01 22:07:10 INFO cluster.YarnClientSchedulerBackend: Stopped
>
> 14/11/01 22:07:11 ERROR spark.MapOutputTrackerMaster: Error communicating
> with MapOutputTracker
>
> akka.pattern.AskTimeoutException:
> Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had
> already been terminated.
>
> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
>
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
>
> at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
>
> at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
>
> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
>
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
>
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
>
> Exception in thread "Yarn Application State Checker"
> org.apache.spark.SparkException: Error communicating with MapOutputTracker
>
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:111)
>
> at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:117)
>
> at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324)
>
> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:80)
>
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1024)
>
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:131)
>
> Caused by: akka.pattern.AskTimeoutException:
> Recipient[Actor[akka://sparkDriver/user/MapOutputTracker#132167931]] had
> already been terminated.
>
> at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
>
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:106)
>
> ... 5 more
>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org