You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Henry Tremblay <pa...@gmail.com> on 2017/02/25 19:33:18 UTC

Spark runs out of memory with small file

I am reading in a single small file from hadoop with wholeText. If I 
process each line and create a row with two cells, the first cell equal 
to the name of the file, the second cell equal to the line. That code 
runs fine.

But if I just add two line of code and change the first cell based on 
parsing a line, spark runs out of memory. Any idea why such a simple 
process that would succeed quickly in a non spark application fails?

Thanks!

Henry

CODE:

[hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
3816096 
/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz


In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
In [2]: rdd1.count()
Out[2]: 1


In [4]: def process_file(s):
    ...:     text = s[1]
    ...:     the_id = s[0]
    ...:     d = {}
    ...:     l =  text.split("\n")
    ...:     final = []
    ...:     for line in l:
    ...:         d[the_id] = line
    ...:         final.append(Row(**d))
    ...:     return final
    ...:

In [5]: rdd2 = rdd1.map(process_file)

In [6]: rdd2.count()
Out[6]: 1

In [7]: rdd3 = rdd2.flatMap(lambda x: x)

In [8]: rdd3.count()
Out[8]: 508310

In [9]: rdd3.take(1)
Out[9]: 
[Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz='WARC/1.0\r')]

In [10]: def process_file(s):
     ...:     text = s[1]
     ...:     d = {}
     ...:     l =  text.split("\n")
     ...:     final = []
     ...:     the_id = "init"
     ...:     for line in l:
     ...:         if line[0:15] == 'WARC-Record-ID:':
     ...:             the_id = line[15:]
     ...:         d[the_id] = line
     ...:         final.append(Row(**d))
     ...:     return final

In [12]: rdd2 = rdd1.map(process_file)

In [13]: rdd2.count()
17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on 
ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN for 
exceeding memory limits. 10.3 GB of 10.3 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.
17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: 
Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB 
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 
5, ip-172-31-41-89.us-west-2.compute.internal, executor 5): 
ExecutorLostFailure (executor 5 exited caused by one of the running 
tasks) Reason: Container killed by YARN for exceeding memory limits. 
10.3 GB of 10.3 GB physical memory used. Consider boosting 
spark.yarn.executor.memoryOverhead.


-- 
Henry Tremblay
Robert Half Technology


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark runs out of memory with small file

Posted by Henry Tremblay <pa...@gmail.com>.
I am actually using Spark 2.1 and trying to solve a real life problem. 
Unfortunately, some of the discussion of my problem went off line, and 
then I started a new thread.

Here is my problem. I am parsing crawl data which exists in a flat file 
format. It looks like this:

u'WARC/1.0',
  u'WARC-Type: warcinfo',
  u'WARC-Date: 2016-12-08T13:00:23Z',
  u'WARC-Record-ID: <urn:uuid:f609f246-df68-46ef-

a1c5-2f66e833ffd6>',
  u'Content-Length: 344',
  u'Content-Type: application/warc-fields',
  u'WARC-Filename: 
CC-MAIN-20161202170900-00000-ip-10-31-129-80.ec2.internal.warc.gz',
  u'',
  u'robots: classic',
  u'hostname: ip-10-31-129-80.ec2.internal',
  u'software: Nutch 1.6 (CC)/CC WarcExport 1.0',
  u'isPartOf: CC-MAIN-2016-50',
  u'operator: CommonCrawl Admin',
  u'description: Wide crawl of the web for November 2016',
  u'publisher: CommonCrawl',
  u'format: WARC File Format 1.0',
  u'conformsTo: 
http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf 
<http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf>',
  u'',
  u'',
  u'WARC/1.0',
  u'WARC-Type: request',
  u'WARC-Date: 2016-12-02T17:54:09Z',
  u'WARC-Record-ID: <urn:uuid:cc7ddf8b-4646-4440-a70a-e253818cf10b>',
  u'Content-Length: 220',
  u'Content-Type: application/http; msgtype=request',
  u'WARC-Warcinfo-ID: <urn:uuid:f609f246-df68-46ef-a1c5-2f66e833ffd6>',
  u'WARC-IP-Address: 217.197.115.133',
  u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/ 
<http://1018201.vkrugudruzei.ru/blog/>',
  u'',
  u'GET /blog/ HTTP/1.0',
  u'Host: 1018201.vkrugudruzei.ru <http://1018201.vkrugudruzei.ru>',
  u'Accept-Encoding: x-gzip, gzip, deflate',
  u'User-Agent: CCBot/2.0 (http://commoncrawl.org/faq/) 
<http://commoncrawl.org/faq/%29>',
  u'Accept: 
text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
  u'',
  u'',
  u'',
  u'WARC/1.0',
  u'WARC-Type: response',
  u'WARC-Date: 2016-12-02T17:54:09Z',
  u'WARC-Record-ID: <urn:uuid:4c5e6d1a-e64f-4b6e-8101-c5e46feb84a0>',
  u'Content-Length: 577',
  u'Content-Type: application/http; msgtype=response',
  u'WARC-Warcinfo-ID: <urn:uuid:f609f246-df68-46ef-a1c5-2f66e833ffd6>',
  u'WARC-Concurrent-To: <urn:uuid:cc7ddf8b-4646-4440-a70a-e253818cf10b>',
  u'WARC-IP-Address: 217.197.115.133',
  u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/ 
<http://1018201.vkrugudruzei.ru/blog/>',
  u'WARC-Payload-Digest: sha1:Y4TZFLB6UTXHU4HUVONBXC5NZQW2LYMM',
  u'WARC-Block-Digest: sha1:3J7HHBMWTSC7W53DDB7BHTUVPM26QS4B',
  u'']

I want to turn it into something like this:

Row(warc-type='request',warc-
date='2016-12-02'. 
ward-record-id='<urn:uuid:cc7ddf8b-4646-4440-a70a-e253818cf10b....)

In other words, I want to turn rows into columns. There are no keywords 
in the flat file.

 From there I can read it in as a dataframe.


On 02/26/2017 12:14 PM, Gourav Sengupta wrote:
> Hi Henry,
>
> Those guys in Databricks training are nuts and still use Spark 1.x for 
> their exams. Learning SPARK is a VERY VERY VERY old way of solving 
> problems using SPARK.
>
> The core engine of SPARK, which even I understand, has gone through 
> several fundamental changes.
>
> Just try reading the file using dataframes and try using SPARK 2.1.
>
> In other words it may be of tremendous benefit if you were learning to 
> solve problems which exists rather than problems which does not exist 
> any more.
>
> Please let me know in case I can be of any further help.
>
> Regards,
> Gourav
>
> On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay 
> <paulhtremblay@gmail.com <ma...@gmail.com>> wrote:
>
>     The file is so small that a stand alone python script, independent
>     of spark, can process the file in under a second.
>
>     Also, the following fails:
>
>     1. Read the whole file in with wholeFiles
>
>     2. use flatMap to get 50,000 rows that looks like: Row(id="path",
>     line="line")
>
>     3. Save the results as CVS to HDFS
>
>     4. Read the files (there are 20) from HDFS into a df using
>     sqlContext.read.csv(<path>)
>
>     5. Convert the df to an rdd.
>
>     6 Create key value pairs with the key being the file path and the
>     value being the line.
>
>     7 Iterate through values
>
>     What happens is Spark either runs out of memory, or, in my last
>     try with a slight variation, just hangs for 12 hours.
>
>     Henry
>
>
>     On 02/26/2017 03:31 AM, \u989c\u53d1\u624d(Yan Facai) wrote:
>>     Hi, Tremblay.
>>     Your file is .gz format, which is not splittable for hadoop.
>>     Perhaps the file is loaded by only one executor.
>>     How many executors do you start?
>>     Perhaps repartition method could solve it, I guess.
>>
>>
>>     On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay
>>     <paulhtremblay@gmail.com <ma...@gmail.com>> wrote:
>>
>>         I am reading in a single small file from hadoop with
>>         wholeText. If I process each line and create a row with two
>>         cells, the first cell equal to the name of the file, the
>>         second cell equal to the line. That code runs fine.
>>
>>         But if I just add two line of code and change the first cell
>>         based on parsing a line, spark runs out of memory. Any idea
>>         why such a simple process that would succeed quickly in a non
>>         spark application fails?
>>
>>         Thanks!
>>
>>         Henry
>>
>>         CODE:
>>
>>         [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
>>         3816096
>>         /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz
>>
>>
>>         In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
>>         In [2]: rdd1.count()
>>         Out[2]: 1
>>
>>
>>         In [4]: def process_file(s):
>>            ...:     text = s[1]
>>            ...:     the_id = s[0]
>>            ...:     d = {}
>>            ...:     l =  text.split("\n")
>>            ...:     final = []
>>            ...:     for line in l:
>>            ...:         d[the_id] = line
>>            ...:         final.append(Row(**d))
>>            ...:     return final
>>            ...:
>>
>>         In [5]: rdd2 = rdd1.map(process_file)
>>
>>         In [6]: rdd2.count()
>>         Out[6]: 1
>>
>>         In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>>
>>         In [8]: rdd3.count()
>>         Out[8]: 508310
>>
>>         In [9]: rdd3.take(1)
>>         Out[9]: [Row(hdfs://ip-172-31-35-67.us
>>         <http://ip-172-31-35-67.us>-west-2.compute.internal:8020/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>>         <http://3-00570-ip-10-171-10-70.ec2.in>ternal.warc.gz='WARC/1.0\r')]
>>
>>         In [10]: def process_file(s):
>>             ...:     text = s[1]
>>             ...:     d = {}
>>             ...:     l =  text.split("\n")
>>             ...:     final = []
>>             ...:     the_id = "init"
>>             ...:     for line in l:
>>             ...:         if line[0:15] == 'WARC-Record-ID:':
>>             ...:             the_id = line[15:]
>>             ...:         d[the_id] = line
>>             ...:         final.append(Row(**d))
>>             ...:     return final
>>
>>         In [12]: rdd2 = rdd1.map(process_file)
>>
>>         In [13]: rdd2.count()
>>         17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
>>         ip-172-31-41-89.us-west-2.compute.internal: Container killed
>>         by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
>>         physical memory used. Consider boosting
>>         spark.yarn.executor.memoryOverhead.
>>         17/02/25 19:03:03 WARN
>>         YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed
>>         by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
>>         physical memory used. Consider boosting
>>         spark.yarn.executor.memoryOverhead.
>>         17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage
>>         5.0 (TID 5, ip-172-31-41-89.us-west-2.compute.internal,
>>         executor 5): ExecutorLostFailure (executor 5 exited caused by
>>         one of the running tasks) Reason: Container killed by YARN
>>         for exceeding memory limits. 10.3 GB of 10.3 GB physical
>>         memory used. Consider boosting
>>         spark.yarn.executor.memoryOverhead.
>>
>>
>>         -- 
>>         Henry Tremblay
>>         Robert Half Technology
>>
>>
>>         ---------------------------------------------------------------------
>>         To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>         <ma...@spark.apache.org>
>>
>>
>
>     -- 
>     Henry Tremblay
>     Robert Half Technology
>
>

-- 
Henry Tremblay
Robert Half Technology


Re: Spark runs out of memory with small file

Posted by Henry Tremblay <pa...@gmail.com>.
Cool! Now I understand how to approach this problem. At my last 
position, I don't think we did it quite efficiently. Maybe a blog post 
by me?

Henry


On 02/28/2017 01:22 AM, \u989c\u53d1\u624d(Yan Facai) wrote:
> Google is your friend, Henry.
> http://stackoverflow.com/questions/21185092/apache-spark-map-vs-mappartitions
>
> On Tue, Feb 28, 2017 at 2:17 AM, Henry Tremblay 
> <paulhtremblay@gmail.com <ma...@gmail.com>> wrote:
>
>     Thanks! That works:
>
>     def process_file(my_iter):
>         the_id = "init"
>         final = []
>         for chunk in my_iter:
>             lines = chunk[1].split("\n")
>             for line in lines:
>                 if line[0:15] == 'WARC-Record-ID:':
>                     the_id = line[15:]
>                 final.append(Row(the_id = the_id, line = line))
>         return iter(final)
>
>     rdd2 = rdd.mapPartition(process_file)
>
>     Can anyone explain why this solution works? I am aware that an
>     iterator is lazily evaluated, but my exact understanding in this
>     case is vague.
>
>     Henry
>
>
>     On 02/27/2017 12:50 AM, \u989c\u53d1\u624d(Yan Facai) wrote:
>>     Hi, Tremblay,
>>     map processes text line by line, so it is not the method you need.
>>
>>     However,
>>     mapPartition and iterator can help you maintain a state.
>>     like:
>>     http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapPartitions
>>     <http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapPartitions>
>>
>>
>>
>>
>>     On Mon, Feb 27, 2017 at 4:24 PM, Henry Tremblay
>>     <paulhtremblay@gmail.com <ma...@gmail.com>> wrote:
>>
>>         This won't work:
>>
>>         rdd2 = rdd.flatMap(splitf)
>>
>>         rdd2.take(1)
>>
>>         [u'WARC/1.0\r']
>>
>>         rdd2.count()
>>
>>         508310
>>
>>         If I then try to apply a map to rdd2, the map only works on
>>         each individual line. I need to create a state machine as in
>>         my second function. That is, I need to apply a key to each
>>         line, but the key is determined by a previous line.
>>
>>         My first function below always has the same id. That was the
>>         point, to show that the first function succeeded while the
>>         second failed. In the  dictionary grows, but it has at most
>>         508,310 keys, In fact, most likely it will have only about
>>         1/10th of this or less. I used the same exact code with the
>>         same file with pure python, without Spark, and the process
>>         ran in under 1 second.
>>
>>         Thanks!
>>
>>
>>         Henry
>>
>>         On 02/26/2017 11:37 PM, Pavel Plotnikov wrote:
>>>
>>>         Hi, Henry
>>>
>>>         In first example the dict d always contains only one value
>>>         because the_Id is same, in second case duct grows very quickly.
>>>         So, I can suggest to firstly apply map function to split you
>>>         file with string on rows then please make repartition and
>>>         then apply custom logic
>>>
>>>         Example:
>>>
>>>         def splitf(s):
>>>             return s.split("\n")
>>>
>>>         rdd.flatmap(splitf).repartition(1000).map(your function)
>>>
>>>         Best,
>>>         Pavel
>>>
>>>
>>>         On Mon, 27 Feb 2017, 06:28 Henry Tremblay,
>>>         <paulhtremblay@gmail.com <ma...@gmail.com>>
>>>         wrote:
>>>
>>>             Not sure where you want me to put yield. My first try
>>>             caused an error in Spark that it could not pickle
>>>             generator objects.
>>>
>>>
>>>             On 02/26/2017 03:25 PM, ayan guha wrote:
>>>>             Hi
>>>>
>>>>             We are doing similar stuff, but with large number of
>>>>             small-ish files. What we do is write a function to
>>>>             parse a complete file, similar to your parse file. But
>>>>             we use yield, instead of return and flatmap on top of
>>>>             it. Can you give it a try and let us know if it works?
>>>>
>>>>             On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers
>>>>             <koert@tresata.com <ma...@tresata.com>> wrote:
>>>>
>>>>                 using wholeFiles to process formats that can not be
>>>>                 split per line is not "old"
>>>>
>>>>                 and there are plenty of problems for which RDD is
>>>>                 still better suited than Dataset or DataFrame
>>>>                 currently (this might change in near future when
>>>>                 Dataset gets some crucial optimizations fixed).
>>>>
>>>>                 On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta
>>>>                 <gourav.sengupta@gmail.com
>>>>                 <ma...@gmail.com>> wrote:
>>>>
>>>>                     Hi Henry,
>>>>
>>>>                     Those guys in Databricks training are nuts and
>>>>                     still use Spark 1.x for their exams. Learning
>>>>                     SPARK is a VERY VERY VERY old way of solving
>>>>                     problems using SPARK.
>>>>
>>>>                     The core engine of SPARK, which even I
>>>>                     understand, has gone through several
>>>>                     fundamental changes.
>>>>
>>>>                     Just try reading the file using dataframes and
>>>>                     try using SPARK 2.1.
>>>>
>>>>                     In other words it may be of tremendous benefit
>>>>                     if you were learning to solve problems which
>>>>                     exists rather than problems which does not
>>>>                     exist any more.
>>>>
>>>>                     Please let me know in case I can be of any
>>>>                     further help.
>>>>
>>>>                     Regards,
>>>>                     Gourav
>>>>
>>>>                     On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay
>>>>                     <paulhtremblay@gmail.com
>>>>                     <ma...@gmail.com>> wrote:
>>>>
>>>>                         The file is so small that a stand alone
>>>>                         python script, independent of spark, can
>>>>                         process the file in under a second.
>>>>
>>>>                         Also, the following fails:
>>>>
>>>>                         1. Read the whole file in with wholeFiles
>>>>
>>>>                         2. use flatMap to get 50,000 rows that
>>>>                         looks like: Row(id="path", line="line")
>>>>
>>>>                         3. Save the results as CVS to HDFS
>>>>
>>>>                         4. Read the files (there are 20) from HDFS
>>>>                         into a df using sqlContext.read.csv(<path>)
>>>>
>>>>                         5. Convert the df to an rdd.
>>>>
>>>>                         6 Create key value pairs with the key being
>>>>                         the file path and the value being the line.
>>>>
>>>>                         7 Iterate through values
>>>>
>>>>                         What happens is Spark either runs out of
>>>>                         memory, or, in my last try with a slight
>>>>                         variation, just hangs for 12 hours.
>>>>
>>>>                         Henry
>>>>
>>>>
>>>>                         On 02/26/2017 03:31 AM, \u989c\u53d1\u624d(Yan Facai)
>>>>                         wrote:
>>>>>                         Hi, Tremblay.
>>>>>                         Your file is .gz format, which is not
>>>>>                         splittable for hadoop. Perhaps the file is
>>>>>                         loaded by only one executor.
>>>>>                         How many executors do you start?
>>>>>                         Perhaps repartition method could solve it,
>>>>>                         I guess.
>>>>>
>>>>>
>>>>>                         On Sun, Feb 26, 2017 at 3:33 AM, Henry
>>>>>                         Tremblay <paulhtremblay@gmail.com
>>>>>                         <ma...@gmail.com>> wrote:
>>>>>
>>>>>                             I am reading in a single small file
>>>>>                             from hadoop with wholeText. If I
>>>>>                             process each line and create a row
>>>>>                             with two cells, the first cell equal
>>>>>                             to the name of the file, the second
>>>>>                             cell equal to the line. That code runs
>>>>>                             fine.
>>>>>
>>>>>                             But if I just add two line of code and
>>>>>                             change the first cell based on parsing
>>>>>                             a line, spark runs out of memory. Any
>>>>>                             idea why such a simple process that
>>>>>                             would succeed quickly in a non spark
>>>>>                             application fails?
>>>>>
>>>>>                             Thanks!
>>>>>
>>>>>                             Henry
>>>>>
>>>>>                             CODE:
>>>>>
>>>>>                             [hadoop@ip-172-31-35-67 ~]$ hadoop fs
>>>>>                             -du /mnt/temp
>>>>>                             3816096
>>>>>                             /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz
>>>>>
>>>>>
>>>>>                             In [1]: rdd1 =
>>>>>                             sc.wholeTextFiles("/mnt/temp")
>>>>>                             In [2]: rdd1.count()
>>>>>                             Out[2]: 1
>>>>>
>>>>>
>>>>>                             In [4]: def process_file(s):
>>>>>                                ...:  text = s[1]
>>>>>                                ...:  the_id = s[0]
>>>>>                                ...:     d = {}
>>>>>                                ...:     l = text.split("\n")
>>>>>                                ...:  final = []
>>>>>                                ...:  for line in l:
>>>>>                                ...:  d[the_id] = line
>>>>>                                ...:  final.append(Row(**d))
>>>>>                                ...:  return final
>>>>>                                ...:
>>>>>
>>>>>                             In [5]: rdd2 = rdd1.map(process_file)
>>>>>
>>>>>                             In [6]: rdd2.count()
>>>>>                             Out[6]: 1
>>>>>
>>>>>                             In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>>>>>
>>>>>                             In [8]: rdd3.count()
>>>>>                             Out[8]: 508310
>>>>>
>>>>>                             In [9]: rdd3.take(1)
>>>>>                             Out[9]: [Row(hdfs://ip-172-31-35-67.us
>>>>>                             <http://ip-172-31-35-67.us>-west-2.compute.internal:8020/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>>>>>                             <http://3-00570-ip-10-171-10-70.ec2.in>ternal.warc.gz='WARC/1.0\r')]
>>>>>
>>>>>                             In [10]: def process_file(s):
>>>>>                                 ...:  text = s[1]
>>>>>                                 ...:     d = {}
>>>>>                                 ...:     l = text.split("\n")
>>>>>                                 ...:  final = []
>>>>>                                 ...:  the_id = "init"
>>>>>                                 ...:  for line in l:
>>>>>                                 ...:    if line[0:15] ==
>>>>>                             'WARC-Record-ID:':
>>>>>                                 ...:        the_id = line[15:]
>>>>>                                 ...:    d[the_id] = line
>>>>>                                 ...:  final.append(Row(**d))
>>>>>                                 ...:  return final
>>>>>
>>>>>                             In [12]: rdd2 = rdd1.map(process_file)
>>>>>
>>>>>                             In [13]: rdd2.count()
>>>>>                             17/02/25 19:03:03 ERROR YarnScheduler:
>>>>>                             Lost executor 5 on
>>>>>                             ip-172-31-41-89.us-west-2.compute.internal:
>>>>>                             Container killed by YARN for exceeding
>>>>>                             memory limits. 10.3 GB of 10.3 GB
>>>>>                             physical memory used. Consider
>>>>>                             boosting
>>>>>                             spark.yarn.executor.memoryOverhead.
>>>>>                             17/02/25 19:03:03 WARN
>>>>>                             YarnSchedulerBackend$YarnSchedulerEndpoint:
>>>>>                             Container killed by YARN for exceeding
>>>>>                             memory limits. 10.3 GB of 10.3 GB
>>>>>                             physical memory used. Consider
>>>>>                             boosting
>>>>>                             spark.yarn.executor.memoryOverhead.
>>>>>                             17/02/25 19:03:03 WARN TaskSetManager:
>>>>>                             Lost task 0.0 in stage 5.0 (TID 5,
>>>>>                             ip-172-31-41-89.us-west-2.compute.internal,
>>>>>                             executor 5): ExecutorLostFailure
>>>>>                             (executor 5 exited caused by one of
>>>>>                             the running tasks) Reason: Container
>>>>>                             killed by YARN for exceeding memory
>>>>>                             limits. 10.3 GB of 10.3 GB physical
>>>>>                             memory used. Consider boosting
>>>>>                             spark.yarn.executor.memoryOverhead.
>>>>>
>>>>>
>>>>>                             -- 
>>>>>                             Henry Tremblay
>>>>>                             Robert Half Technology
>>>>>
>>>>>
>>>>>                             ---------------------------------------------------------------------
>>>>>                             To unsubscribe e-mail:
>>>>>                             user-unsubscribe@spark.apache.org
>>>>>                             <ma...@spark.apache.org>
>>>>>
>>>>>
>>>>
>>>>                         -- 
>>>>                         Henry Tremblay
>>>>                         Robert Half Technology
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>             -- 
>>>>             Best Regards,
>>>>             Ayan Guha
>>>
>>>             -- 
>>>             Henry Tremblay
>>>             Robert Half Technology
>>>
>>
>>         -- 
>>         Henry Tremblay
>>         Robert Half Technology
>>
>>
>
>     -- 
>     Henry Tremblay
>     Robert Half Technology
>
>

-- 
Henry Tremblay
Robert Half Technology


Re: Spark runs out of memory with small file

Posted by Henry Tremblay <pa...@gmail.com>.
Thanks! That works:

def process_file(my_iter):
     the_id = "init"
     final = []
     for chunk in my_iter:
         lines = chunk[1].split("\n")
         for line in lines:
             if line[0:15] == 'WARC-Record-ID:':
                 the_id = line[15:]
             final.append(Row(the_id = the_id, line = line))
     return iter(final)

rdd2 = rdd.mapPartition(process_file)

Can anyone explain why this solution works? I am aware that an iterator 
is lazily evaluated, but my exact understanding in this case is vague.

Henry


On 02/27/2017 12:50 AM, \u989c\u53d1\u624d(Yan Facai) wrote:
> Hi, Tremblay,
> map processes text line by line, so it is not the method you need.
>
> However,
> mapPartition and iterator can help you maintain a state.
> like:
> http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#mapPartitions
>
>
>
>
> On Mon, Feb 27, 2017 at 4:24 PM, Henry Tremblay 
> <paulhtremblay@gmail.com <ma...@gmail.com>> wrote:
>
>     This won't work:
>
>     rdd2 = rdd.flatMap(splitf)
>
>     rdd2.take(1)
>
>     [u'WARC/1.0\r']
>
>     rdd2.count()
>
>     508310
>
>     If I then try to apply a map to rdd2, the map only works on each
>     individual line. I need to create a state machine as in my second
>     function. That is, I need to apply a key to each line, but the key
>     is determined by a previous line.
>
>     My first function below always has the same id. That was the
>     point, to show that the first function succeeded while the second
>     failed. In the  dictionary grows, but it has at most 508,310 keys,
>     In fact, most likely it will have only about 1/10th of this or
>     less. I used the same exact code with the same file with pure
>     python, without Spark, and the process ran in under 1 second.
>
>     Thanks!
>
>
>     Henry
>
>     On 02/26/2017 11:37 PM, Pavel Plotnikov wrote:
>>
>>     Hi, Henry
>>
>>     In first example the dict d always contains only one value
>>     because the_Id is same, in second case duct grows very quickly.
>>     So, I can suggest to firstly apply map function to split you file
>>     with string on rows then please make repartition and then apply
>>     custom logic
>>
>>     Example:
>>
>>     def splitf(s):
>>         return s.split("\n")
>>
>>     rdd.flatmap(splitf).repartition(1000).map(your function)
>>
>>     Best,
>>     Pavel
>>
>>
>>     On Mon, 27 Feb 2017, 06:28 Henry Tremblay,
>>     <paulhtremblay@gmail.com <ma...@gmail.com>> wrote:
>>
>>         Not sure where you want me to put yield. My first try caused
>>         an error in Spark that it could not pickle generator objects.
>>
>>
>>         On 02/26/2017 03:25 PM, ayan guha wrote:
>>>         Hi
>>>
>>>         We are doing similar stuff, but with large number of
>>>         small-ish files. What we do is write a function to parse a
>>>         complete file, similar to your parse file. But we use yield,
>>>         instead of return and flatmap on top of it. Can you give it
>>>         a try and let us know if it works?
>>>
>>>         On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers
>>>         <koert@tresata.com <ma...@tresata.com>> wrote:
>>>
>>>             using wholeFiles to process formats that can not be
>>>             split per line is not "old"
>>>
>>>             and there are plenty of problems for which RDD is still
>>>             better suited than Dataset or DataFrame currently (this
>>>             might change in near future when Dataset gets some
>>>             crucial optimizations fixed).
>>>
>>>             On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta
>>>             <gourav.sengupta@gmail.com
>>>             <ma...@gmail.com>> wrote:
>>>
>>>                 Hi Henry,
>>>
>>>                 Those guys in Databricks training are nuts and still
>>>                 use Spark 1.x for their exams. Learning SPARK is a
>>>                 VERY VERY VERY old way of solving problems using SPARK.
>>>
>>>                 The core engine of SPARK, which even I understand,
>>>                 has gone through several fundamental changes.
>>>
>>>                 Just try reading the file using dataframes and try
>>>                 using SPARK 2.1.
>>>
>>>                 In other words it may be of tremendous benefit if
>>>                 you were learning to solve problems which exists
>>>                 rather than problems which does not exist any more.
>>>
>>>                 Please let me know in case I can be of any further help.
>>>
>>>                 Regards,
>>>                 Gourav
>>>
>>>                 On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay
>>>                 <paulhtremblay@gmail.com
>>>                 <ma...@gmail.com>> wrote:
>>>
>>>                     The file is so small that a stand alone python
>>>                     script, independent of spark, can process the
>>>                     file in under a second.
>>>
>>>                     Also, the following fails:
>>>
>>>                     1. Read the whole file in with wholeFiles
>>>
>>>                     2. use flatMap to get 50,000 rows that looks
>>>                     like: Row(id="path", line="line")
>>>
>>>                     3. Save the results as CVS to HDFS
>>>
>>>                     4. Read the files (there are 20) from HDFS into
>>>                     a df using sqlContext.read.csv(<path>)
>>>
>>>                     5. Convert the df to an rdd.
>>>
>>>                     6 Create key value pairs with the key being the
>>>                     file path and the value being the line.
>>>
>>>                     7 Iterate through values
>>>
>>>                     What happens is Spark either runs out of memory,
>>>                     or, in my last try with a slight variation, just
>>>                     hangs for 12 hours.
>>>
>>>                     Henry
>>>
>>>
>>>                     On 02/26/2017 03:31 AM, \u989c\u53d1\u624d(Yan Facai) wrote:
>>>>                     Hi, Tremblay.
>>>>                     Your file is .gz format, which is not
>>>>                     splittable for hadoop. Perhaps the file is
>>>>                     loaded by only one executor.
>>>>                     How many executors do you start?
>>>>                     Perhaps repartition method could solve it, I guess.
>>>>
>>>>
>>>>                     On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay
>>>>                     <paulhtremblay@gmail.com
>>>>                     <ma...@gmail.com>> wrote:
>>>>
>>>>                         I am reading in a single small file from
>>>>                         hadoop with wholeText. If I process each
>>>>                         line and create a row with two cells, the
>>>>                         first cell equal to the name of the file,
>>>>                         the second cell equal to the line. That
>>>>                         code runs fine.
>>>>
>>>>                         But if I just add two line of code and
>>>>                         change the first cell based on parsing a
>>>>                         line, spark runs out of memory. Any idea
>>>>                         why such a simple process that would
>>>>                         succeed quickly in a non spark application
>>>>                         fails?
>>>>
>>>>                         Thanks!
>>>>
>>>>                         Henry
>>>>
>>>>                         CODE:
>>>>
>>>>                         [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du
>>>>                         /mnt/temp
>>>>                         3816096
>>>>                         /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz
>>>>
>>>>
>>>>                         In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
>>>>                         In [2]: rdd1.count()
>>>>                         Out[2]: 1
>>>>
>>>>
>>>>                         In [4]: def process_file(s):
>>>>                            ...:  text = s[1]
>>>>                            ...:  the_id = s[0]
>>>>                            ...:     d = {}
>>>>                            ...:     l = text.split("\n")
>>>>                            ...:  final = []
>>>>                            ...:  for line in l:
>>>>                            ...:  d[the_id] = line
>>>>                            ...:  final.append(Row(**d))
>>>>                            ...:  return final
>>>>                            ...:
>>>>
>>>>                         In [5]: rdd2 = rdd1.map(process_file)
>>>>
>>>>                         In [6]: rdd2.count()
>>>>                         Out[6]: 1
>>>>
>>>>                         In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>>>>
>>>>                         In [8]: rdd3.count()
>>>>                         Out[8]: 508310
>>>>
>>>>                         In [9]: rdd3.take(1)
>>>>                         Out[9]: [Row(hdfs://ip-172-31-35-67.us
>>>>                         <http://ip-172-31-35-67.us>-west-2.compute.internal:8020/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>>>>                         <http://3-00570-ip-10-171-10-70.ec2.in>ternal.warc.gz='WARC/1.0\r')]
>>>>
>>>>                         In [10]: def process_file(s):
>>>>                             ...:  text = s[1]
>>>>                             ...:     d = {}
>>>>                             ...:     l = text.split("\n")
>>>>                             ...:  final = []
>>>>                             ...:  the_id = "init"
>>>>                             ...:  for line in l:
>>>>                             ...:    if line[0:15] == 'WARC-Record-ID:':
>>>>                             ...:        the_id = line[15:]
>>>>                             ...:    d[the_id] = line
>>>>                             ...:  final.append(Row(**d))
>>>>                             ...:  return final
>>>>
>>>>                         In [12]: rdd2 = rdd1.map(process_file)
>>>>
>>>>                         In [13]: rdd2.count()
>>>>                         17/02/25 19:03:03 ERROR YarnScheduler: Lost
>>>>                         executor 5 on
>>>>                         ip-172-31-41-89.us-west-2.compute.internal:
>>>>                         Container killed by YARN for exceeding
>>>>                         memory limits. 10.3 GB of 10.3 GB physical
>>>>                         memory used. Consider boosting
>>>>                         spark.yarn.executor.memoryOverhead.
>>>>                         17/02/25 19:03:03 WARN
>>>>                         YarnSchedulerBackend$YarnSchedulerEndpoint:
>>>>                         Container killed by YARN for exceeding
>>>>                         memory limits. 10.3 GB of 10.3 GB physical
>>>>                         memory used. Consider boosting
>>>>                         spark.yarn.executor.memoryOverhead.
>>>>                         17/02/25 19:03:03 WARN TaskSetManager: Lost
>>>>                         task 0.0 in stage 5.0 (TID 5,
>>>>                         ip-172-31-41-89.us-west-2.compute.internal,
>>>>                         executor 5): ExecutorLostFailure (executor
>>>>                         5 exited caused by one of the running
>>>>                         tasks) Reason: Container killed by YARN for
>>>>                         exceeding memory limits. 10.3 GB of 10.3 GB
>>>>                         physical memory used. Consider boosting
>>>>                         spark.yarn.executor.memoryOverhead.
>>>>
>>>>
>>>>                         -- 
>>>>                         Henry Tremblay
>>>>                         Robert Half Technology
>>>>
>>>>
>>>>                         ---------------------------------------------------------------------
>>>>                         To unsubscribe e-mail:
>>>>                         user-unsubscribe@spark.apache.org
>>>>                         <ma...@spark.apache.org>
>>>>
>>>>
>>>
>>>                     -- 
>>>                     Henry Tremblay
>>>                     Robert Half Technology
>>>
>>>
>>>
>>>
>>>
>>>
>>>         -- 
>>>         Best Regards,
>>>         Ayan Guha
>>
>>         -- 
>>         Henry Tremblay
>>         Robert Half Technology
>>
>
>     -- 
>     Henry Tremblay
>     Robert Half Technology
>
>

-- 
Henry Tremblay
Robert Half Technology


Re: Spark runs out of memory with small file

Posted by Henry Tremblay <pa...@gmail.com>.
This won't work:

rdd2 = rdd.flatMap(splitf)

rdd2.take(1)

[u'WARC/1.0\r']

rdd2.count()

508310

If I then try to apply a map to rdd2, the map only works on each 
individual line. I need to create a state machine as in my second 
function. That is, I need to apply a key to each line, but the key is 
determined by a previous line.

My first function below always has the same id. That was the point, to 
show that the first function succeeded while the second failed. In the  
dictionary grows, but it has at most 508,310 keys, In fact, most likely 
it will have only about 1/10th of this or less. I used the same exact 
code with the same file with pure python, without Spark, and the process 
ran in under 1 second.

Thanks!


Henry

On 02/26/2017 11:37 PM, Pavel Plotnikov wrote:
>
> Hi, Henry
>
> In first example the dict d always contains only one value because 
> the_Id is same, in second case duct grows very quickly.
> So, I can suggest to firstly apply map function to split you file with 
> string on rows then please make repartition and then apply custom logic
>
> Example:
>
> def splitf(s):
>     return s.split("\n")
>
> rdd.flatmap(splitf).repartition(1000).map(your function)
>
> Best,
> Pavel
>
>
> On Mon, 27 Feb 2017, 06:28 Henry Tremblay, <paulhtremblay@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Not sure where you want me to put yield. My first try caused an
>     error in Spark that it could not pickle generator objects.
>
>
>     On 02/26/2017 03:25 PM, ayan guha wrote:
>>     Hi
>>
>>     We are doing similar stuff, but with large number of small-ish
>>     files. What we do is write a function to parse a complete file,
>>     similar to your parse file. But we use yield, instead of return
>>     and flatmap on top of it. Can you give it a try and let us know
>>     if it works?
>>
>>     On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers <koert@tresata.com
>>     <ma...@tresata.com>> wrote:
>>
>>         using wholeFiles to process formats that can not be split per
>>         line is not "old"
>>
>>         and there are plenty of problems for which RDD is still
>>         better suited than Dataset or DataFrame currently (this might
>>         change in near future when Dataset gets some crucial
>>         optimizations fixed).
>>
>>         On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta
>>         <gourav.sengupta@gmail.com
>>         <ma...@gmail.com>> wrote:
>>
>>             Hi Henry,
>>
>>             Those guys in Databricks training are nuts and still use
>>             Spark 1.x for their exams. Learning SPARK is a VERY VERY
>>             VERY old way of solving problems using SPARK.
>>
>>             The core engine of SPARK, which even I understand, has
>>             gone through several fundamental changes.
>>
>>             Just try reading the file using dataframes and try using
>>             SPARK 2.1.
>>
>>             In other words it may be of tremendous benefit if you
>>             were learning to solve problems which exists rather than
>>             problems which does not exist any more.
>>
>>             Please let me know in case I can be of any further help.
>>
>>             Regards,
>>             Gourav
>>
>>             On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay
>>             <paulhtremblay@gmail.com
>>             <ma...@gmail.com>> wrote:
>>
>>                 The file is so small that a stand alone python
>>                 script, independent of spark, can process the file in
>>                 under a second.
>>
>>                 Also, the following fails:
>>
>>                 1. Read the whole file in with wholeFiles
>>
>>                 2. use flatMap to get 50,000 rows that looks like:
>>                 Row(id="path", line="line")
>>
>>                 3. Save the results as CVS to HDFS
>>
>>                 4. Read the files (there are 20) from HDFS into a df
>>                 using sqlContext.read.csv(<path>)
>>
>>                 5. Convert the df to an rdd.
>>
>>                 6 Create key value pairs with the key being the file
>>                 path and the value being the line.
>>
>>                 7 Iterate through values
>>
>>                 What happens is Spark either runs out of memory, or,
>>                 in my last try with a slight variation, just hangs
>>                 for 12 hours.
>>
>>                 Henry
>>
>>
>>                 On 02/26/2017 03:31 AM, \u989c\u53d1\u624d(Yan Facai) wrote:
>>>                 Hi, Tremblay.
>>>                 Your file is .gz format, which is not splittable for
>>>                 hadoop. Perhaps the file is loaded by only one executor.
>>>                 How many executors do you start?
>>>                 Perhaps repartition method could solve it, I guess.
>>>
>>>
>>>                 On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay
>>>                 <paulhtremblay@gmail.com
>>>                 <ma...@gmail.com>> wrote:
>>>
>>>                     I am reading in a single small file from hadoop
>>>                     with wholeText. If I process each line and
>>>                     create a row with two cells, the first cell
>>>                     equal to the name of the file, the second cell
>>>                     equal to the line. That code runs fine.
>>>
>>>                     But if I just add two line of code and change
>>>                     the first cell based on parsing a line, spark
>>>                     runs out of memory. Any idea why such a simple
>>>                     process that would succeed quickly in a non
>>>                     spark application fails?
>>>
>>>                     Thanks!
>>>
>>>                     Henry
>>>
>>>                     CODE:
>>>
>>>                     [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
>>>                     3816096
>>>                     /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz
>>>
>>>
>>>                     In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
>>>                     In [2]: rdd1.count()
>>>                     Out[2]: 1
>>>
>>>
>>>                     In [4]: def process_file(s):
>>>                        ...:     text = s[1]
>>>                        ...:     the_id = s[0]
>>>                        ...:     d = {}
>>>                        ...:     l = text.split("\n")
>>>                        ...:     final = []
>>>                        ...:     for line in l:
>>>                        ...:  d[the_id] = line
>>>                        ...:  final.append(Row(**d))
>>>                        ...:     return final
>>>                        ...:
>>>
>>>                     In [5]: rdd2 = rdd1.map(process_file)
>>>
>>>                     In [6]: rdd2.count()
>>>                     Out[6]: 1
>>>
>>>                     In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>>>
>>>                     In [8]: rdd3.count()
>>>                     Out[8]: 508310
>>>
>>>                     In [9]: rdd3.take(1)
>>>                     Out[9]: [Row(hdfs://ip-172-31-35-67.us
>>>                     <http://ip-172-31-35-67.us>-west-2.compute.internal:8020/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>>>                     <http://3-00570-ip-10-171-10-70.ec2.in>ternal.warc.gz='WARC/1.0\r')]
>>>
>>>                     In [10]: def process_file(s):
>>>                         ...:     text = s[1]
>>>                         ...:     d = {}
>>>                         ...:     l = text.split("\n")
>>>                         ...:     final = []
>>>                         ...:  the_id = "init"
>>>                         ...:     for line in l:
>>>                         ...:  if line[0:15] == 'WARC-Record-ID:':
>>>                         ...:    the_id = line[15:]
>>>                         ...:  d[the_id] = line
>>>                         ...:  final.append(Row(**d))
>>>                         ...:  return final
>>>
>>>                     In [12]: rdd2 = rdd1.map(process_file)
>>>
>>>                     In [13]: rdd2.count()
>>>                     17/02/25 19:03:03 ERROR YarnScheduler: Lost
>>>                     executor 5 on
>>>                     ip-172-31-41-89.us-west-2.compute.internal:
>>>                     Container killed by YARN for exceeding memory
>>>                     limits. 10.3 GB of 10.3 GB physical memory used.
>>>                     Consider boosting
>>>                     spark.yarn.executor.memoryOverhead.
>>>                     17/02/25 19:03:03 WARN
>>>                     YarnSchedulerBackend$YarnSchedulerEndpoint:
>>>                     Container killed by YARN for exceeding memory
>>>                     limits. 10.3 GB of 10.3 GB physical memory used.
>>>                     Consider boosting
>>>                     spark.yarn.executor.memoryOverhead.
>>>                     17/02/25 19:03:03 WARN TaskSetManager: Lost task
>>>                     0.0 in stage 5.0 (TID 5,
>>>                     ip-172-31-41-89.us-west-2.compute.internal,
>>>                     executor 5): ExecutorLostFailure (executor 5
>>>                     exited caused by one of the running tasks)
>>>                     Reason: Container killed by YARN for exceeding
>>>                     memory limits. 10.3 GB of 10.3 GB physical
>>>                     memory used. Consider boosting
>>>                     spark.yarn.executor.memoryOverhead.
>>>
>>>
>>>                     -- 
>>>                     Henry Tremblay
>>>                     Robert Half Technology
>>>
>>>
>>>                     ---------------------------------------------------------------------
>>>                     To unsubscribe e-mail:
>>>                     user-unsubscribe@spark.apache.org
>>>                     <ma...@spark.apache.org>
>>>
>>>
>>
>>                 -- 
>>                 Henry Tremblay
>>                 Robert Half Technology
>>
>>
>>
>>
>>
>>
>>     -- 
>>     Best Regards,
>>     Ayan Guha
>
>     -- 
>     Henry Tremblay
>     Robert Half Technology
>

-- 
Henry Tremblay
Robert Half Technology


Re: Spark runs out of memory with small file

Posted by Pavel Plotnikov <pa...@team.wrike.com>.
Hi, Henry

In first example the dict d always contains only one value because the_Id
is same, in second case duct grows very quickly.
So, I can suggest to firstly apply map function to split you file with
string on rows then please make repartition and then apply custom logic

Example:

def splitf(s):
    return s.split("\n")

rdd.flatmap(splitf).repartition(1000).map(your function)

Best,
Pavel

On Mon, 27 Feb 2017, 06:28 Henry Tremblay, <pa...@gmail.com> wrote:

> Not sure where you want me to put yield. My first try caused an error in
> Spark that it could not pickle generator objects.
>
> On 02/26/2017 03:25 PM, ayan guha wrote:
>
> Hi
>
> We are doing similar stuff, but with large number of small-ish files. What
> we do is write a function to parse a complete file, similar to your parse
> file. But we use yield, instead of return and flatmap on top of it. Can you
> give it a try and let us know if it works?
>
> On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
> using wholeFiles to process formats that can not be split per line is not
> "old"
>
> and there are plenty of problems for which RDD is still better suited than
> Dataset or DataFrame currently (this might change in near future when
> Dataset gets some crucial optimizations fixed).
>
> On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta <
> gourav.sengupta@gmail.com> wrote:
>
> Hi Henry,
>
> Those guys in Databricks training are nuts and still use Spark 1.x for
> their exams. Learning SPARK is a VERY VERY VERY old way of solving problems
> using SPARK.
>
> The core engine of SPARK, which even I understand, has gone through
> several fundamental changes.
>
> Just try reading the file using dataframes and try using SPARK 2.1.
>
> In other words it may be of tremendous benefit if you were learning to
> solve problems which exists rather than problems which does not exist any
> more.
>
> Please let me know in case I can be of any further help.
>
> Regards,
> Gourav
>
> On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay <pa...@gmail.com>
> wrote:
>
> The file is so small that a stand alone python script, independent of
> spark, can process the file in under a second.
>
> Also, the following fails:
>
> 1. Read the whole file in with wholeFiles
>
> 2. use flatMap to get 50,000 rows that looks like: Row(id="path",
> line="line")
>
> 3. Save the results as CVS to HDFS
>
> 4. Read the files (there are 20) from HDFS into a df using
> sqlContext.read.csv(<path>)
>
> 5. Convert the df to an rdd.
>
> 6 Create key value pairs with the key being the file path and the value
> being the line.
>
> 7 Iterate through values
>
> What happens is Spark either runs out of memory, or, in my last try with a
> slight variation, just hangs for 12 hours.
>
> Henry
>
> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
>
> Hi, Tremblay.
> Your file is .gz format, which is not splittable for hadoop. Perhaps the
> file is loaded by only one executor.
> How many executors do you start?
> Perhaps repartition method could solve it, I guess.
>
>
> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay <pa...@gmail.com>
> wrote:
>
> I am reading in a single small file from hadoop with wholeText. If I
> process each line and create a row with two cells, the first cell equal to
> the name of the file, the second cell equal to the line. That code runs
> fine.
>
> But if I just add two line of code and change the first cell based on
> parsing a line, spark runs out of memory. Any idea why such a simple
> process that would succeed quickly in a non spark application fails?
>
> Thanks!
>
> Henry
>
> CODE:
>
> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
> 3816096
> /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz
>
>
> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
> In [2]: rdd1.count()
> Out[2]: 1
>
>
> In [4]: def process_file(s):
>    ...:     text = s[1]
>    ...:     the_id = s[0]
>    ...:     d = {}
>    ...:     l =  text.split("\n")
>    ...:     final = []
>    ...:     for line in l:
>    ...:         d[the_id] = line
>    ...:         final.append(Row(**d))
>    ...:     return final
>    ...:
>
> In [5]: rdd2 = rdd1.map(process_file)
>
> In [6]: rdd2.count()
> Out[6]: 1
>
> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>
> In [8]: rdd3.count()
> Out[8]: 508310
>
> In [9]: rdd3.take(1)
> Out[9]: [Row(hdfs://ip-172-31-35-67.us
> -west-2.compute.internal:8020/mnt/temp/CC-MAIN-2017011609512
> 3-00570-ip-10-171-10-70.ec2.internal.warc.gz='WARC/1.0\r')]
>
> In [10]: def process_file(s):
>     ...:     text = s[1]
>     ...:     d = {}
>     ...:     l =  text.split("\n")
>     ...:     final = []
>     ...:     the_id = "init"
>     ...:     for line in l:
>     ...:         if line[0:15] == 'WARC-Record-ID:':
>     ...:             the_id = line[15:]
>     ...:         d[the_id] = line
>     ...:         final.append(Row(**d))
>     ...:     return final
>
> In [12]: rdd2 = rdd1.map(process_file)
>
> In [13]: rdd2.count()
> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN for
> exceeding memory limits. 10.3 GB of 10.3 GB physical memory used. Consider
> boosting spark.yarn.executor.memoryOverhead.
> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5,
> ip-172-31-41-89.us-west-2.compute.internal, executor 5):
> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
> 10.3 GB physical memory used. Consider boosting
> spark.yarn.executor.memoryOverhead.
>
>
> --
> Henry Tremblay
> Robert Half Technology
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>
>
> --
> Henry Tremblay
> Robert Half Technology
>
>
>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
> --
> Henry Tremblay
> Robert Half Technology
>
>

Re: Spark runs out of memory with small file

Posted by Henry Tremblay <pa...@gmail.com>.
Not sure where you want me to put yield. My first try caused an error in 
Spark that it could not pickle generator objects.


On 02/26/2017 03:25 PM, ayan guha wrote:
> Hi
>
> We are doing similar stuff, but with large number of small-ish files. 
> What we do is write a function to parse a complete file, similar to 
> your parse file. But we use yield, instead of return and flatmap on 
> top of it. Can you give it a try and let us know if it works?
>
> On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers <koert@tresata.com 
> <ma...@tresata.com>> wrote:
>
>     using wholeFiles to process formats that can not be split per line
>     is not "old"
>
>     and there are plenty of problems for which RDD is still better
>     suited than Dataset or DataFrame currently (this might change in
>     near future when Dataset gets some crucial optimizations fixed).
>
>     On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta
>     <gourav.sengupta@gmail.com <ma...@gmail.com>> wrote:
>
>         Hi Henry,
>
>         Those guys in Databricks training are nuts and still use Spark
>         1.x for their exams. Learning SPARK is a VERY VERY VERY old
>         way of solving problems using SPARK.
>
>         The core engine of SPARK, which even I understand, has gone
>         through several fundamental changes.
>
>         Just try reading the file using dataframes and try using SPARK
>         2.1.
>
>         In other words it may be of tremendous benefit if you were
>         learning to solve problems which exists rather than problems
>         which does not exist any more.
>
>         Please let me know in case I can be of any further help.
>
>         Regards,
>         Gourav
>
>         On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay
>         <paulhtremblay@gmail.com <ma...@gmail.com>> wrote:
>
>             The file is so small that a stand alone python script,
>             independent of spark, can process the file in under a second.
>
>             Also, the following fails:
>
>             1. Read the whole file in with wholeFiles
>
>             2. use flatMap to get 50,000 rows that looks like:
>             Row(id="path", line="line")
>
>             3. Save the results as CVS to HDFS
>
>             4. Read the files (there are 20) from HDFS into a df using
>             sqlContext.read.csv(<path>)
>
>             5. Convert the df to an rdd.
>
>             6 Create key value pairs with the key being the file path
>             and the value being the line.
>
>             7 Iterate through values
>
>             What happens is Spark either runs out of memory, or, in my
>             last try with a slight variation, just hangs for 12 hours.
>
>             Henry
>
>
>             On 02/26/2017 03:31 AM, \u989c\u53d1\u624d(Yan Facai) wrote:
>>             Hi, Tremblay.
>>             Your file is .gz format, which is not splittable for
>>             hadoop. Perhaps the file is loaded by only one executor.
>>             How many executors do you start?
>>             Perhaps repartition method could solve it, I guess.
>>
>>
>>             On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay
>>             <paulhtremblay@gmail.com
>>             <ma...@gmail.com>> wrote:
>>
>>                 I am reading in a single small file from hadoop with
>>                 wholeText. If I process each line and create a row
>>                 with two cells, the first cell equal to the name of
>>                 the file, the second cell equal to the line. That
>>                 code runs fine.
>>
>>                 But if I just add two line of code and change the
>>                 first cell based on parsing a line, spark runs out of
>>                 memory. Any idea why such a simple process that would
>>                 succeed quickly in a non spark application fails?
>>
>>                 Thanks!
>>
>>                 Henry
>>
>>                 CODE:
>>
>>                 [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
>>                 3816096
>>                 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz
>>
>>
>>                 In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
>>                 In [2]: rdd1.count()
>>                 Out[2]: 1
>>
>>
>>                 In [4]: def process_file(s):
>>                    ...:     text = s[1]
>>                    ...:     the_id = s[0]
>>                    ...:     d = {}
>>                    ...:     l = text.split("\n")
>>                    ...:     final = []
>>                    ...:     for line in l:
>>                    ...:         d[the_id] = line
>>                    ...:  final.append(Row(**d))
>>                    ...:     return final
>>                    ...:
>>
>>                 In [5]: rdd2 = rdd1.map(process_file)
>>
>>                 In [6]: rdd2.count()
>>                 Out[6]: 1
>>
>>                 In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>>
>>                 In [8]: rdd3.count()
>>                 Out[8]: 508310
>>
>>                 In [9]: rdd3.take(1)
>>                 Out[9]: [Row(hdfs://ip-172-31-35-67.us
>>                 <http://ip-172-31-35-67.us>-west-2.compute.internal:8020/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>>                 <http://3-00570-ip-10-171-10-70.ec2.in>ternal.warc.gz='WARC/1.0\r')]
>>
>>                 In [10]: def process_file(s):
>>                     ...:     text = s[1]
>>                     ...:     d = {}
>>                     ...:     l = text.split("\n")
>>                     ...:     final = []
>>                     ...:     the_id = "init"
>>                     ...:     for line in l:
>>                     ...:         if line[0:15] == 'WARC-Record-ID:':
>>                     ...:  the_id = line[15:]
>>                     ...:         d[the_id] = line
>>                     ...:  final.append(Row(**d))
>>                     ...:     return final
>>
>>                 In [12]: rdd2 = rdd1.map(process_file)
>>
>>                 In [13]: rdd2.count()
>>                 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor
>>                 5 on ip-172-31-41-89.us-west-2.compute.internal:
>>                 Container killed by YARN for exceeding memory limits.
>>                 10.3 GB of 10.3 GB physical memory used. Consider
>>                 boosting spark.yarn.executor.memoryOverhead.
>>                 17/02/25 19:03:03 WARN
>>                 YarnSchedulerBackend$YarnSchedulerEndpoint: Container
>>                 killed by YARN for exceeding memory limits. 10.3 GB
>>                 of 10.3 GB physical memory used. Consider boosting
>>                 spark.yarn.executor.memoryOverhead.
>>                 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0
>>                 in stage 5.0 (TID 5,
>>                 ip-172-31-41-89.us-west-2.compute.internal, executor
>>                 5): ExecutorLostFailure (executor 5 exited caused by
>>                 one of the running tasks) Reason: Container killed by
>>                 YARN for exceeding memory limits. 10.3 GB of 10.3 GB
>>                 physical memory used. Consider boosting
>>                 spark.yarn.executor.memoryOverhead.
>>
>>
>>                 -- 
>>                 Henry Tremblay
>>                 Robert Half Technology
>>
>>
>>                 ---------------------------------------------------------------------
>>                 To unsubscribe e-mail:
>>                 user-unsubscribe@spark.apache.org
>>                 <ma...@spark.apache.org>
>>
>>
>
>             -- 
>             Henry Tremblay
>             Robert Half Technology
>
>
>
>
>
>
> -- 
> Best Regards,
> Ayan Guha

-- 
Henry Tremblay
Robert Half Technology


Re: Spark runs out of memory with small file

Posted by ayan guha <gu...@gmail.com>.
Hi

We are doing similar stuff, but with large number of small-ish files. What
we do is write a function to parse a complete file, similar to your parse
file. But we use yield, instead of return and flatmap on top of it. Can you
give it a try and let us know if it works?

On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers <ko...@tresata.com> wrote:

> using wholeFiles to process formats that can not be split per line is not
> "old"
>
> and there are plenty of problems for which RDD is still better suited than
> Dataset or DataFrame currently (this might change in near future when
> Dataset gets some crucial optimizations fixed).
>
> On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta <
> gourav.sengupta@gmail.com> wrote:
>
>> Hi Henry,
>>
>> Those guys in Databricks training are nuts and still use Spark 1.x for
>> their exams. Learning SPARK is a VERY VERY VERY old way of solving problems
>> using SPARK.
>>
>> The core engine of SPARK, which even I understand, has gone through
>> several fundamental changes.
>>
>> Just try reading the file using dataframes and try using SPARK 2.1.
>>
>> In other words it may be of tremendous benefit if you were learning to
>> solve problems which exists rather than problems which does not exist any
>> more.
>>
>> Please let me know in case I can be of any further help.
>>
>> Regards,
>> Gourav
>>
>> On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay <pa...@gmail.com>
>> wrote:
>>
>>> The file is so small that a stand alone python script, independent of
>>> spark, can process the file in under a second.
>>>
>>> Also, the following fails:
>>>
>>> 1. Read the whole file in with wholeFiles
>>>
>>> 2. use flatMap to get 50,000 rows that looks like: Row(id="path",
>>> line="line")
>>>
>>> 3. Save the results as CVS to HDFS
>>>
>>> 4. Read the files (there are 20) from HDFS into a df using
>>> sqlContext.read.csv(<path>)
>>>
>>> 5. Convert the df to an rdd.
>>>
>>> 6 Create key value pairs with the key being the file path and the value
>>> being the line.
>>>
>>> 7 Iterate through values
>>>
>>> What happens is Spark either runs out of memory, or, in my last try with
>>> a slight variation, just hangs for 12 hours.
>>>
>>> Henry
>>>
>>> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
>>>
>>> Hi, Tremblay.
>>> Your file is .gz format, which is not splittable for hadoop. Perhaps the
>>> file is loaded by only one executor.
>>> How many executors do you start?
>>> Perhaps repartition method could solve it, I guess.
>>>
>>>
>>> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay <paulhtremblay@gmail.com
>>> > wrote:
>>>
>>>> I am reading in a single small file from hadoop with wholeText. If I
>>>> process each line and create a row with two cells, the first cell equal to
>>>> the name of the file, the second cell equal to the line. That code runs
>>>> fine.
>>>>
>>>> But if I just add two line of code and change the first cell based on
>>>> parsing a line, spark runs out of memory. Any idea why such a simple
>>>> process that would succeed quickly in a non spark application fails?
>>>>
>>>> Thanks!
>>>>
>>>> Henry
>>>>
>>>> CODE:
>>>>
>>>> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
>>>> 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.i
>>>> nternal.warc.gz
>>>>
>>>>
>>>> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
>>>> In [2]: rdd1.count()
>>>> Out[2]: 1
>>>>
>>>>
>>>> In [4]: def process_file(s):
>>>>    ...:     text = s[1]
>>>>    ...:     the_id = s[0]
>>>>    ...:     d = {}
>>>>    ...:     l =  text.split("\n")
>>>>    ...:     final = []
>>>>    ...:     for line in l:
>>>>    ...:         d[the_id] = line
>>>>    ...:         final.append(Row(**d))
>>>>    ...:     return final
>>>>    ...:
>>>>
>>>> In [5]: rdd2 = rdd1.map(process_file)
>>>>
>>>> In [6]: rdd2.count()
>>>> Out[6]: 1
>>>>
>>>> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>>>>
>>>> In [8]: rdd3.count()
>>>> Out[8]: 508310
>>>>
>>>> In [9]: rdd3.take(1)
>>>> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
>>>> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>>>> ternal.warc.gz='WARC/1.0\r')]
>>>>
>>>> In [10]: def process_file(s):
>>>>     ...:     text = s[1]
>>>>     ...:     d = {}
>>>>     ...:     l =  text.split("\n")
>>>>     ...:     final = []
>>>>     ...:     the_id = "init"
>>>>     ...:     for line in l:
>>>>     ...:         if line[0:15] == 'WARC-Record-ID:':
>>>>     ...:             the_id = line[15:]
>>>>     ...:         d[the_id] = line
>>>>     ...:         final.append(Row(**d))
>>>>     ...:     return final
>>>>
>>>> In [12]: rdd2 = rdd1.map(process_file)
>>>>
>>>> In [13]: rdd2.count()
>>>> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
>>>> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN
>>>> for exceeding memory limits. 10.3 GB of 10.3 GB physical memory used.
>>>> Consider boosting spark.yarn.executor.memoryOverhead.
>>>> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
>>>> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
>>>> physical memory used. Consider boosting spark.yarn.executor.memoryOver
>>>> head.
>>>> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID
>>>> 5, ip-172-31-41-89.us-west-2.compute.internal, executor 5):
>>>> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
>>>> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
>>>> 10.3 GB physical memory used. Consider boosting
>>>> spark.yarn.executor.memoryOverhead.
>>>>
>>>>
>>>> --
>>>> Henry Tremblay
>>>> Robert Half Technology
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>
>>>>
>>>
>>> --
>>> Henry Tremblay
>>> Robert Half Technology
>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha

Re: Spark runs out of memory with small file

Posted by Koert Kuipers <ko...@tresata.com>.
using wholeFiles to process formats that can not be split per line is not
"old"

and there are plenty of problems for which RDD is still better suited than
Dataset or DataFrame currently (this might change in near future when
Dataset gets some crucial optimizations fixed).

On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta <go...@gmail.com>
wrote:

> Hi Henry,
>
> Those guys in Databricks training are nuts and still use Spark 1.x for
> their exams. Learning SPARK is a VERY VERY VERY old way of solving problems
> using SPARK.
>
> The core engine of SPARK, which even I understand, has gone through
> several fundamental changes.
>
> Just try reading the file using dataframes and try using SPARK 2.1.
>
> In other words it may be of tremendous benefit if you were learning to
> solve problems which exists rather than problems which does not exist any
> more.
>
> Please let me know in case I can be of any further help.
>
> Regards,
> Gourav
>
> On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay <pa...@gmail.com>
> wrote:
>
>> The file is so small that a stand alone python script, independent of
>> spark, can process the file in under a second.
>>
>> Also, the following fails:
>>
>> 1. Read the whole file in with wholeFiles
>>
>> 2. use flatMap to get 50,000 rows that looks like: Row(id="path",
>> line="line")
>>
>> 3. Save the results as CVS to HDFS
>>
>> 4. Read the files (there are 20) from HDFS into a df using
>> sqlContext.read.csv(<path>)
>>
>> 5. Convert the df to an rdd.
>>
>> 6 Create key value pairs with the key being the file path and the value
>> being the line.
>>
>> 7 Iterate through values
>>
>> What happens is Spark either runs out of memory, or, in my last try with
>> a slight variation, just hangs for 12 hours.
>>
>> Henry
>>
>> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
>>
>> Hi, Tremblay.
>> Your file is .gz format, which is not splittable for hadoop. Perhaps the
>> file is loaded by only one executor.
>> How many executors do you start?
>> Perhaps repartition method could solve it, I guess.
>>
>>
>> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay <pa...@gmail.com>
>> wrote:
>>
>>> I am reading in a single small file from hadoop with wholeText. If I
>>> process each line and create a row with two cells, the first cell equal to
>>> the name of the file, the second cell equal to the line. That code runs
>>> fine.
>>>
>>> But if I just add two line of code and change the first cell based on
>>> parsing a line, spark runs out of memory. Any idea why such a simple
>>> process that would succeed quickly in a non spark application fails?
>>>
>>> Thanks!
>>>
>>> Henry
>>>
>>> CODE:
>>>
>>> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
>>> 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.i
>>> nternal.warc.gz
>>>
>>>
>>> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
>>> In [2]: rdd1.count()
>>> Out[2]: 1
>>>
>>>
>>> In [4]: def process_file(s):
>>>    ...:     text = s[1]
>>>    ...:     the_id = s[0]
>>>    ...:     d = {}
>>>    ...:     l =  text.split("\n")
>>>    ...:     final = []
>>>    ...:     for line in l:
>>>    ...:         d[the_id] = line
>>>    ...:         final.append(Row(**d))
>>>    ...:     return final
>>>    ...:
>>>
>>> In [5]: rdd2 = rdd1.map(process_file)
>>>
>>> In [6]: rdd2.count()
>>> Out[6]: 1
>>>
>>> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>>>
>>> In [8]: rdd3.count()
>>> Out[8]: 508310
>>>
>>> In [9]: rdd3.take(1)
>>> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
>>> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>>> ternal.warc.gz='WARC/1.0\r')]
>>>
>>> In [10]: def process_file(s):
>>>     ...:     text = s[1]
>>>     ...:     d = {}
>>>     ...:     l =  text.split("\n")
>>>     ...:     final = []
>>>     ...:     the_id = "init"
>>>     ...:     for line in l:
>>>     ...:         if line[0:15] == 'WARC-Record-ID:':
>>>     ...:             the_id = line[15:]
>>>     ...:         d[the_id] = line
>>>     ...:         final.append(Row(**d))
>>>     ...:     return final
>>>
>>> In [12]: rdd2 = rdd1.map(process_file)
>>>
>>> In [13]: rdd2.count()
>>> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
>>> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN
>>> for exceeding memory limits. 10.3 GB of 10.3 GB physical memory used.
>>> Consider boosting spark.yarn.executor.memoryOverhead.
>>> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
>>> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
>>> physical memory used. Consider boosting spark.yarn.executor.memoryOver
>>> head.
>>> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID
>>> 5, ip-172-31-41-89.us-west-2.compute.internal, executor 5):
>>> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
>>> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
>>> 10.3 GB physical memory used. Consider boosting
>>> spark.yarn.executor.memoryOverhead.
>>>
>>>
>>> --
>>> Henry Tremblay
>>> Robert Half Technology
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>
>>>
>>
>> --
>> Henry Tremblay
>> Robert Half Technology
>>
>>
>

Re: Spark runs out of memory with small file

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Henry,

Those guys in Databricks training are nuts and still use Spark 1.x for
their exams. Learning SPARK is a VERY VERY VERY old way of solving problems
using SPARK.

The core engine of SPARK, which even I understand, has gone through several
fundamental changes.

Just try reading the file using dataframes and try using SPARK 2.1.

In other words it may be of tremendous benefit if you were learning to
solve problems which exists rather than problems which does not exist any
more.

Please let me know in case I can be of any further help.

Regards,
Gourav

On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay <pa...@gmail.com>
wrote:

> The file is so small that a stand alone python script, independent of
> spark, can process the file in under a second.
>
> Also, the following fails:
>
> 1. Read the whole file in with wholeFiles
>
> 2. use flatMap to get 50,000 rows that looks like: Row(id="path",
> line="line")
>
> 3. Save the results as CVS to HDFS
>
> 4. Read the files (there are 20) from HDFS into a df using
> sqlContext.read.csv(<path>)
>
> 5. Convert the df to an rdd.
>
> 6 Create key value pairs with the key being the file path and the value
> being the line.
>
> 7 Iterate through values
>
> What happens is Spark either runs out of memory, or, in my last try with a
> slight variation, just hangs for 12 hours.
>
> Henry
>
> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
>
> Hi, Tremblay.
> Your file is .gz format, which is not splittable for hadoop. Perhaps the
> file is loaded by only one executor.
> How many executors do you start?
> Perhaps repartition method could solve it, I guess.
>
>
> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay <pa...@gmail.com>
> wrote:
>
>> I am reading in a single small file from hadoop with wholeText. If I
>> process each line and create a row with two cells, the first cell equal to
>> the name of the file, the second cell equal to the line. That code runs
>> fine.
>>
>> But if I just add two line of code and change the first cell based on
>> parsing a line, spark runs out of memory. Any idea why such a simple
>> process that would succeed quickly in a non spark application fails?
>>
>> Thanks!
>>
>> Henry
>>
>> CODE:
>>
>> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
>> 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.i
>> nternal.warc.gz
>>
>>
>> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
>> In [2]: rdd1.count()
>> Out[2]: 1
>>
>>
>> In [4]: def process_file(s):
>>    ...:     text = s[1]
>>    ...:     the_id = s[0]
>>    ...:     d = {}
>>    ...:     l =  text.split("\n")
>>    ...:     final = []
>>    ...:     for line in l:
>>    ...:         d[the_id] = line
>>    ...:         final.append(Row(**d))
>>    ...:     return final
>>    ...:
>>
>> In [5]: rdd2 = rdd1.map(process_file)
>>
>> In [6]: rdd2.count()
>> Out[6]: 1
>>
>> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>>
>> In [8]: rdd3.count()
>> Out[8]: 508310
>>
>> In [9]: rdd3.take(1)
>> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
>> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>> ternal.warc.gz='WARC/1.0\r')]
>>
>> In [10]: def process_file(s):
>>     ...:     text = s[1]
>>     ...:     d = {}
>>     ...:     l =  text.split("\n")
>>     ...:     final = []
>>     ...:     the_id = "init"
>>     ...:     for line in l:
>>     ...:         if line[0:15] == 'WARC-Record-ID:':
>>     ...:             the_id = line[15:]
>>     ...:         d[the_id] = line
>>     ...:         final.append(Row(**d))
>>     ...:     return final
>>
>> In [12]: rdd2 = rdd1.map(process_file)
>>
>> In [13]: rdd2.count()
>> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
>> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN for
>> exceeding memory limits. 10.3 GB of 10.3 GB physical memory used. Consider
>> boosting spark.yarn.executor.memoryOverhead.
>> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
>> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
>> physical memory used. Consider boosting spark.yarn.executor.memoryOver
>> head.
>> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5,
>> ip-172-31-41-89.us-west-2.compute.internal, executor 5):
>> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
>> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
>> 10.3 GB physical memory used. Consider boosting
>> spark.yarn.executor.memoryOverhead.
>>
>>
>> --
>> Henry Tremblay
>> Robert Half Technology
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>
>
> --
> Henry Tremblay
> Robert Half Technology
>
>

Re: Spark runs out of memory with small file

Posted by Henry Tremblay <pa...@gmail.com>.
The file is so small that a stand alone python script, independent of 
spark, can process the file in under a second.

Also, the following fails:

1. Read the whole file in with wholeFiles

2. use flatMap to get 50,000 rows that looks like: Row(id="path", 
line="line")

3. Save the results as CVS to HDFS

4. Read the files (there are 20) from HDFS into a df using 
sqlContext.read.csv(<path>)

5. Convert the df to an rdd.

6 Create key value pairs with the key being the file path and the value 
being the line.

7 Iterate through values

What happens is Spark either runs out of memory, or, in my last try with 
a slight variation, just hangs for 12 hours.

Henry


On 02/26/2017 03:31 AM, \u989c\u53d1\u624d(Yan Facai) wrote:
> Hi, Tremblay.
> Your file is .gz format, which is not splittable for hadoop. Perhaps 
> the file is loaded by only one executor.
> How many executors do you start?
> Perhaps repartition method could solve it, I guess.
>
>
> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay 
> <paulhtremblay@gmail.com <ma...@gmail.com>> wrote:
>
>     I am reading in a single small file from hadoop with wholeText. If
>     I process each line and create a row with two cells, the first
>     cell equal to the name of the file, the second cell equal to the
>     line. That code runs fine.
>
>     But if I just add two line of code and change the first cell based
>     on parsing a line, spark runs out of memory. Any idea why such a
>     simple process that would succeed quickly in a non spark
>     application fails?
>
>     Thanks!
>
>     Henry
>
>     CODE:
>
>     [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
>     3816096
>     /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz
>
>
>     In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
>     In [2]: rdd1.count()
>     Out[2]: 1
>
>
>     In [4]: def process_file(s):
>        ...:     text = s[1]
>        ...:     the_id = s[0]
>        ...:     d = {}
>        ...:     l =  text.split("\n")
>        ...:     final = []
>        ...:     for line in l:
>        ...:         d[the_id] = line
>        ...:         final.append(Row(**d))
>        ...:     return final
>        ...:
>
>     In [5]: rdd2 = rdd1.map(process_file)
>
>     In [6]: rdd2.count()
>     Out[6]: 1
>
>     In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>
>     In [8]: rdd3.count()
>     Out[8]: 508310
>
>     In [9]: rdd3.take(1)
>     Out[9]: [Row(hdfs://ip-172-31-35-67.us
>     <http://ip-172-31-35-67.us>-west-2.compute.internal:8020/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz='WARC/1.0\r')]
>
>     In [10]: def process_file(s):
>         ...:     text = s[1]
>         ...:     d = {}
>         ...:     l =  text.split("\n")
>         ...:     final = []
>         ...:     the_id = "init"
>         ...:     for line in l:
>         ...:         if line[0:15] == 'WARC-Record-ID:':
>         ...:             the_id = line[15:]
>         ...:         d[the_id] = line
>         ...:         final.append(Row(**d))
>         ...:     return final
>
>     In [12]: rdd2 = rdd1.map(process_file)
>
>     In [13]: rdd2.count()
>     17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
>     ip-172-31-41-89.us-west-2.compute.internal: Container killed by
>     YARN for exceeding memory limits. 10.3 GB of 10.3 GB physical
>     memory used. Consider boosting spark.yarn.executor.memoryOverhead.
>     17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
>     Container killed by YARN for exceeding memory limits. 10.3 GB of
>     10.3 GB physical memory used. Consider boosting
>     spark.yarn.executor.memoryOverhead.
>     17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0
>     (TID 5, ip-172-31-41-89.us-west-2.compute.internal, executor 5):
>     ExecutorLostFailure (executor 5 exited caused by one of the
>     running tasks) Reason: Container killed by YARN for exceeding
>     memory limits. 10.3 GB of 10.3 GB physical memory used. Consider
>     boosting spark.yarn.executor.memoryOverhead.
>
>
>     -- 
>     Henry Tremblay
>     Robert Half Technology
>
>
>     ---------------------------------------------------------------------
>     To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>     <ma...@spark.apache.org>
>
>

-- 
Henry Tremblay
Robert Half Technology


Re: Spark runs out of memory with small file

Posted by "颜发才 (Yan Facai)" <fa...@gmail.com>.
Hi, Tremblay.
Your file is .gz format, which is not splittable for hadoop. Perhaps the
file is loaded by only one executor.
How many executors do you start?
Perhaps repartition method could solve it, I guess.


On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay <pa...@gmail.com>
wrote:

> I am reading in a single small file from hadoop with wholeText. If I
> process each line and create a row with two cells, the first cell equal to
> the name of the file, the second cell equal to the line. That code runs
> fine.
>
> But if I just add two line of code and change the first cell based on
> parsing a line, spark runs out of memory. Any idea why such a simple
> process that would succeed quickly in a non spark application fails?
>
> Thanks!
>
> Henry
>
> CODE:
>
> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
> 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.
> internal.warc.gz
>
>
> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
> In [2]: rdd1.count()
> Out[2]: 1
>
>
> In [4]: def process_file(s):
>    ...:     text = s[1]
>    ...:     the_id = s[0]
>    ...:     d = {}
>    ...:     l =  text.split("\n")
>    ...:     final = []
>    ...:     for line in l:
>    ...:         d[the_id] = line
>    ...:         final.append(Row(**d))
>    ...:     return final
>    ...:
>
> In [5]: rdd2 = rdd1.map(process_file)
>
> In [6]: rdd2.count()
> Out[6]: 1
>
> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>
> In [8]: rdd3.count()
> Out[8]: 508310
>
> In [9]: rdd3.take(1)
> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.
> internal.warc.gz='WARC/1.0\r')]
>
> In [10]: def process_file(s):
>     ...:     text = s[1]
>     ...:     d = {}
>     ...:     l =  text.split("\n")
>     ...:     final = []
>     ...:     the_id = "init"
>     ...:     for line in l:
>     ...:         if line[0:15] == 'WARC-Record-ID:':
>     ...:             the_id = line[15:]
>     ...:         d[the_id] = line
>     ...:         final.append(Row(**d))
>     ...:     return final
>
> In [12]: rdd2 = rdd1.map(process_file)
>
> In [13]: rdd2.count()
> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN for
> exceeding memory limits. 10.3 GB of 10.3 GB physical memory used. Consider
> boosting spark.yarn.executor.memoryOverhead.
> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOver
> head.
> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5,
> ip-172-31-41-89.us-west-2.compute.internal, executor 5):
> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
> 10.3 GB physical memory used. Consider boosting
> spark.yarn.executor.memoryOverhead.
>
>
> --
> Henry Tremblay
> Robert Half Technology
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>