You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Alexander Czech <al...@googlemail.com> on 2017/11/27 09:56:29 UTC

Loading a large parquet file how much memory do I need

I want to load a 10TB parquet File from S3 and I'm trying to decide what
EC2 instances to use.

Should I go for instances that in total have a larger memory size than
10TB? Or is it enough that they have in total enough SSD storage so that
everything can be spilled to disk?

thanks

Re: Loading a large parquet file how much memory do I need

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

10 TB in Athena would cost $50. If your data is in Parquet, then it will
cost even less because of columnar striping. So I am genuinely not quite
sure what you are speaking about? Also what do you mean by "I currently
need"? Are you already processing the data?

Since you mentioned that you are a student, may I please ask which
University and College you are studying at? I may provide some additional
information regarding the same.


Regards,
Gourav Sengupta


On Mon, Nov 27, 2017 at 7:55 PM, Alexander Czech <
alexander.czech@googlemail.com> wrote:

> Hi
> Simply because you have to pay on top of every instance hour. I currently
> need about 4800h of r3.2xlarge EMR takes 0.18$ instance hour so it would be
> 864$ just in EMR costs (spot prices are around 0.12$/h).
>
> Just to stay on topic I thought about getting 40 i2.xlarge instances which
> have about 1TB of combined ram and 32TB of combined SSD space would this be
> enough to load a 10TB parquet or do I need more RAM/Disk spill space?
>
> On Mon, Nov 27, 2017 at 6:06 PM, Gourav Sengupta <
> gourav.sengupta@gmail.com> wrote:
>
>> Hi,
>>
>> I think that I have mentioned all the required alternatives. However I am
>> quite curious as to how did you conclude that processing using EMR is going
>> to be more expensive than using any other stack. I have been using EMR
>> since last 6 years (almost about the time it came out), and have always
>> found it cheap, reliable, safe and stable (ofcourse its like fire, if you
>> are not careful it can end up burning you financially).
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Mon, Nov 27, 2017 at 12:58 PM, Alexander Czech <
>> alexander.czech@googlemail.com> wrote:
>>
>>> I don't use EMR I spin my clusters up using flintrock (beeing a student
>>> my budget is slim), my code is writen in pyspark and my data is in the
>>> us-east-1 region (N. Virginia). I will do my best explaining it with tables:
>>>
>>> My input with a size of (10TB) sits in multiple (~150) parquets on S3
>>>
>>> +-----------+--------------------------+-------+------+-------+
>>> |        uri|                 link_list|lang_id|vector|content|
>>> +-----------+--------------------------+-------+------+-------+
>>> |www.123.com|[www.123.com,www.abc.com,]|   null|  null|   null|
>>> |www.abc.com|[www.opq.com,www.456.com,]|   null|  null|   null|
>>> |www.456.com|[www.xyz.com,www.abc.com,]|   null|  null|   null|
>>>
>>>
>>> *(link_list is a  ArrayType(StringType()))*
>>>
>>> Step1 : I only load the uri and link_list columns (but they make up the
>>> bulk of the data). Then every uri is given a unique ID with df.withColumn('uri_id',
>>> func.monotonically_increasing_id())
>>> resulting in a dataframe looking like this
>>>
>>> *DF_A*:
>>>
>>> +-----------+--------------------------+-------+
>>> |        uri|                 link_list| uri_id|
>>> +-----------+--------------------------+-------+
>>> |www.123.com|[www.123.com,www.abc.com,]|      1|
>>> |www.abc.com|[www.opq.com,www.456.com,]|      2|
>>> |www.456.com|[www.xyz.com,www.abc.com,]|      3|
>>>
>>> Step 2: I create another dataframe containing only the uri and uri_id which is renamed to link_id fields
>>>
>>> *DF_B*:
>>> +-----------+--------+
>>> |        uri| link_id|
>>> +-----------+--------+
>>> |www.123.com|       1|
>>> |www.abc.com|       2|
>>> |www.456.com|       3|
>>>
>>> Step 3: Now I exploded the link_list field in *DF_A* with  *DF_A*.select("uri_id", func.explode("link_list").alias("link"))
>>> This gives me
>>>
>>> *DF_C*:
>>> +-----------+-------+
>>> |       link| uri_id|
>>> +-----------+-------+
>>> |www.123.com|      1|
>>> |www.abc.com|      1|
>>> |www.opq.com|      2|
>>> |www.456.com|      2|
>>> |www.xyz.com|      3|
>>> |www.abc.com|      3|
>>>
>>>
>>> Lastly I Join DF_B DF_C *DF_C*.join(*DF_B*, *DF_C*.link==*DF_B*.uri, "left_outer").drop("uri") Which results in the final dataframe:
>>>
>>>
>>> +-----------+-------+--------+
>>> |       link| uri_id| link_id|
>>> +-----------+-------+--------+
>>> |www.123.com|      1|       1|
>>> |www.abc.com|      1|       2|
>>> |www.opq.com|      2|    null|
>>> |www.456.com|      2|       3|
>>> |www.xyz.com|      3|    null|
>>> |www.abc.com|      3|       1|
>>>
>>> (in code the field link is also dropped but this makes it hopefully more intelligible this way)
>>>
>>>
>>> the rest is to just join the uri_id with the lang_id,vector,content rows that are not null which is trivial.
>>>
>>> I hope this makes it more readable. If there is an aws service that makes it easier for me to deal with the data, since it is basically "just" database operations I'm also happy to hear about it.
>>> I got a few days on my hands until the preprocessing is done but I'm not sure if the explod in step 3 can be done in another aws service.
>>>
>>> thanks!
>>>
>>>
>>> On Mon, Nov 27, 2017 at 12:32 PM, Gourav Sengupta <
>>> gourav.sengupta@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> it would be much simpler in case you just provide two tables with the
>>>> samples of input and output. Going through the verbose text and trying to
>>>> read and figure out what is happening is a bit daunting.
>>>>
>>>> Personally, given that you have your entire data in Parquet, I do not
>>>> think that you will need to have a large cluster size at all. You can do it
>>>> with a small size cluster as well, but depending on the cluster size, you
>>>> might want to create intermediate staging tables or persist the data.
>>>>
>>>> Also it will be of help if you could kindly provide the EMR version
>>>> that you are using.
>>>>
>>>>
>>>> On another note also mention the AWS Region you are in. If Redshift
>>>> Spectrum is available, or you can use Athena, or you can use Presto, then
>>>> running massive aggregates over huge data sets at fraction of cost and at
>>>> least 10x speed may be handy as well.
>>>>
>>>> Let me know in case you need any further help.
>>>>
>>>> Regards,
>>>> Gourav
>>>>
>>>> On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <
>>>> alexander.czech@googlemail.com> wrote:
>>>>
>>>>> I have a temporary result file ( the 10TB one) that looks like this
>>>>> I have around 3 billion rows of (url,url_list,language,vector,text).
>>>>> The bulk of data is in url_list and at the moment I can only guess how
>>>>> large url_list is. I want to give an ID to every url and then this ID to
>>>>> every url in url_list to have a ID to ID graph.The columns language,vector
>>>>> and text only have values for 1% of all rows so they only play a very minor
>>>>> roll.
>>>>>
>>>>> The idea at the moment is to load the URL and URL_list column from the
>>>>> parquet and give ever row an ID. Then exploded the URL_list and join the
>>>>> IDs to this on the now exploded rows. After that I drop the URLs from
>>>>> URL_list column. For the rest of the computation I only load those rows
>>>>> from the parquet that have values in (language,vector and text) and join
>>>>> them with the ID table.
>>>>>
>>>>> In the end I will create 3 tables:
>>>>> 1. url, ID
>>>>> 2. ID, ID
>>>>> 3. ID,language,vector,text
>>>>>
>>>>> Basically there is one very big shuffle going on the rest is not that
>>>>> heavy. The CPU intense lifting was done before that.
>>>>>
>>>>> On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <
>>>>> alexander.czech@googlemail.com> wrote:
>>>>>
>>>>>> I have a temporary result file ( the 10TB one) that looks like this
>>>>>> I have around 3 billion rows of (url,url_list,language,vector,text).
>>>>>> The bulk of data is in url_list and at the moment I can only guess how
>>>>>> large url_list is. I want to give an ID to every url and then this ID to
>>>>>> every url in url_list to have a ID to ID graph.The columns language,vector
>>>>>> and text only have values for 1% of all rows so they only play a very minor
>>>>>> roll.
>>>>>>
>>>>>> The idea at the moment is to load the URL and URL_list column from
>>>>>> the parquet and give ever row an ID. Then exploded the URL_list and join
>>>>>> the IDs to this on the now exploded rows. After that I drop the URLs from
>>>>>> URL_list column. For the rest of the computation I only load those rows
>>>>>> from the parquet that have values in (language,vector and text) and join
>>>>>> them with the ID table.
>>>>>>
>>>>>> In the end I will create 3 tables:
>>>>>> 1. url, ID
>>>>>> 2. ID, ID
>>>>>> 3. ID,language,vector,text
>>>>>>
>>>>>> Basically there is one very big shuffle going on the rest is not that
>>>>>> heavy. The CPU intense lifting was done before that.
>>>>>>
>>>>>> On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler <
>>>>>> georg.kf.heiler@gmail.com> wrote:
>>>>>>
>>>>>>> How many columns do you need from the big file?
>>>>>>>
>>>>>>> Also how CPU / memory intensive are the computations you want to
>>>>>>> perform?
>>>>>>>
>>>>>>> Alexander Czech <al...@googlemail.com> schrieb am Mo. 27.
>>>>>>> Nov. 2017 um 10:57:
>>>>>>>
>>>>>>>> I want to load a 10TB parquet File from S3 and I'm trying to decide
>>>>>>>> what EC2 instances to use.
>>>>>>>>
>>>>>>>> Should I go for instances that in total have a larger memory size
>>>>>>>> than 10TB? Or is it enough that they have in total enough SSD storage so
>>>>>>>> that everything can be spilled to disk?
>>>>>>>>
>>>>>>>> thanks
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Loading a large parquet file how much memory do I need

Posted by Alexander Czech <al...@googlemail.com>.
Hi
Simply because you have to pay on top of every instance hour. I currently
need about 4800h of r3.2xlarge EMR takes 0.18$ instance hour so it would be
864$ just in EMR costs (spot prices are around 0.12$/h).

Just to stay on topic I thought about getting 40 i2.xlarge instances which
have about 1TB of combined ram and 32TB of combined SSD space would this be
enough to load a 10TB parquet or do I need more RAM/Disk spill space?

On Mon, Nov 27, 2017 at 6:06 PM, Gourav Sengupta <go...@gmail.com>
wrote:

> Hi,
>
> I think that I have mentioned all the required alternatives. However I am
> quite curious as to how did you conclude that processing using EMR is going
> to be more expensive than using any other stack. I have been using EMR
> since last 6 years (almost about the time it came out), and have always
> found it cheap, reliable, safe and stable (ofcourse its like fire, if you
> are not careful it can end up burning you financially).
>
> Regards,
> Gourav Sengupta
>
> On Mon, Nov 27, 2017 at 12:58 PM, Alexander Czech <
> alexander.czech@googlemail.com> wrote:
>
>> I don't use EMR I spin my clusters up using flintrock (beeing a student
>> my budget is slim), my code is writen in pyspark and my data is in the
>> us-east-1 region (N. Virginia). I will do my best explaining it with tables:
>>
>> My input with a size of (10TB) sits in multiple (~150) parquets on S3
>>
>> +-----------+--------------------------+-------+------+-------+
>> |        uri|                 link_list|lang_id|vector|content|
>> +-----------+--------------------------+-------+------+-------+
>> |www.123.com|[www.123.com,www.abc.com,]|   null|  null|   null|
>> |www.abc.com|[www.opq.com,www.456.com,]|   null|  null|   null|
>> |www.456.com|[www.xyz.com,www.abc.com,]|   null|  null|   null|
>>
>>
>> *(link_list is a  ArrayType(StringType()))*
>>
>> Step1 : I only load the uri and link_list columns (but they make up the
>> bulk of the data). Then every uri is given a unique ID with df.withColumn('uri_id',
>> func.monotonically_increasing_id())
>> resulting in a dataframe looking like this
>>
>> *DF_A*:
>>
>> +-----------+--------------------------+-------+
>> |        uri|                 link_list| uri_id|
>> +-----------+--------------------------+-------+
>> |www.123.com|[www.123.com,www.abc.com,]|      1|
>> |www.abc.com|[www.opq.com,www.456.com,]|      2|
>> |www.456.com|[www.xyz.com,www.abc.com,]|      3|
>>
>> Step 2: I create another dataframe containing only the uri and uri_id which is renamed to link_id fields
>>
>> *DF_B*:
>> +-----------+--------+
>> |        uri| link_id|
>> +-----------+--------+
>> |www.123.com|       1|
>> |www.abc.com|       2|
>> |www.456.com|       3|
>>
>> Step 3: Now I exploded the link_list field in *DF_A* with  *DF_A*.select("uri_id", func.explode("link_list").alias("link"))
>> This gives me
>>
>> *DF_C*:
>> +-----------+-------+
>> |       link| uri_id|
>> +-----------+-------+
>> |www.123.com|      1|
>> |www.abc.com|      1|
>> |www.opq.com|      2|
>> |www.456.com|      2|
>> |www.xyz.com|      3|
>> |www.abc.com|      3|
>>
>>
>> Lastly I Join DF_B DF_C *DF_C*.join(*DF_B*, *DF_C*.link==*DF_B*.uri, "left_outer").drop("uri") Which results in the final dataframe:
>>
>>
>> +-----------+-------+--------+
>> |       link| uri_id| link_id|
>> +-----------+-------+--------+
>> |www.123.com|      1|       1|
>> |www.abc.com|      1|       2|
>> |www.opq.com|      2|    null|
>> |www.456.com|      2|       3|
>> |www.xyz.com|      3|    null|
>> |www.abc.com|      3|       1|
>>
>> (in code the field link is also dropped but this makes it hopefully more intelligible this way)
>>
>>
>> the rest is to just join the uri_id with the lang_id,vector,content rows that are not null which is trivial.
>>
>> I hope this makes it more readable. If there is an aws service that makes it easier for me to deal with the data, since it is basically "just" database operations I'm also happy to hear about it.
>> I got a few days on my hands until the preprocessing is done but I'm not sure if the explod in step 3 can be done in another aws service.
>>
>> thanks!
>>
>>
>> On Mon, Nov 27, 2017 at 12:32 PM, Gourav Sengupta <
>> gourav.sengupta@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> it would be much simpler in case you just provide two tables with the
>>> samples of input and output. Going through the verbose text and trying to
>>> read and figure out what is happening is a bit daunting.
>>>
>>> Personally, given that you have your entire data in Parquet, I do not
>>> think that you will need to have a large cluster size at all. You can do it
>>> with a small size cluster as well, but depending on the cluster size, you
>>> might want to create intermediate staging tables or persist the data.
>>>
>>> Also it will be of help if you could kindly provide the EMR version that
>>> you are using.
>>>
>>>
>>> On another note also mention the AWS Region you are in. If Redshift
>>> Spectrum is available, or you can use Athena, or you can use Presto, then
>>> running massive aggregates over huge data sets at fraction of cost and at
>>> least 10x speed may be handy as well.
>>>
>>> Let me know in case you need any further help.
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <
>>> alexander.czech@googlemail.com> wrote:
>>>
>>>> I have a temporary result file ( the 10TB one) that looks like this
>>>> I have around 3 billion rows of (url,url_list,language,vector,text).
>>>> The bulk of data is in url_list and at the moment I can only guess how
>>>> large url_list is. I want to give an ID to every url and then this ID to
>>>> every url in url_list to have a ID to ID graph.The columns language,vector
>>>> and text only have values for 1% of all rows so they only play a very minor
>>>> roll.
>>>>
>>>> The idea at the moment is to load the URL and URL_list column from the
>>>> parquet and give ever row an ID. Then exploded the URL_list and join the
>>>> IDs to this on the now exploded rows. After that I drop the URLs from
>>>> URL_list column. For the rest of the computation I only load those rows
>>>> from the parquet that have values in (language,vector and text) and join
>>>> them with the ID table.
>>>>
>>>> In the end I will create 3 tables:
>>>> 1. url, ID
>>>> 2. ID, ID
>>>> 3. ID,language,vector,text
>>>>
>>>> Basically there is one very big shuffle going on the rest is not that
>>>> heavy. The CPU intense lifting was done before that.
>>>>
>>>> On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <
>>>> alexander.czech@googlemail.com> wrote:
>>>>
>>>>> I have a temporary result file ( the 10TB one) that looks like this
>>>>> I have around 3 billion rows of (url,url_list,language,vector,text).
>>>>> The bulk of data is in url_list and at the moment I can only guess how
>>>>> large url_list is. I want to give an ID to every url and then this ID to
>>>>> every url in url_list to have a ID to ID graph.The columns language,vector
>>>>> and text only have values for 1% of all rows so they only play a very minor
>>>>> roll.
>>>>>
>>>>> The idea at the moment is to load the URL and URL_list column from the
>>>>> parquet and give ever row an ID. Then exploded the URL_list and join the
>>>>> IDs to this on the now exploded rows. After that I drop the URLs from
>>>>> URL_list column. For the rest of the computation I only load those rows
>>>>> from the parquet that have values in (language,vector and text) and join
>>>>> them with the ID table.
>>>>>
>>>>> In the end I will create 3 tables:
>>>>> 1. url, ID
>>>>> 2. ID, ID
>>>>> 3. ID,language,vector,text
>>>>>
>>>>> Basically there is one very big shuffle going on the rest is not that
>>>>> heavy. The CPU intense lifting was done before that.
>>>>>
>>>>> On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler <
>>>>> georg.kf.heiler@gmail.com> wrote:
>>>>>
>>>>>> How many columns do you need from the big file?
>>>>>>
>>>>>> Also how CPU / memory intensive are the computations you want to
>>>>>> perform?
>>>>>>
>>>>>> Alexander Czech <al...@googlemail.com> schrieb am Mo. 27.
>>>>>> Nov. 2017 um 10:57:
>>>>>>
>>>>>>> I want to load a 10TB parquet File from S3 and I'm trying to decide
>>>>>>> what EC2 instances to use.
>>>>>>>
>>>>>>> Should I go for instances that in total have a larger memory size
>>>>>>> than 10TB? Or is it enough that they have in total enough SSD storage so
>>>>>>> that everything can be spilled to disk?
>>>>>>>
>>>>>>> thanks
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Loading a large parquet file how much memory do I need

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

I think that I have mentioned all the required alternatives. However I am
quite curious as to how did you conclude that processing using EMR is going
to be more expensive than using any other stack. I have been using EMR
since last 6 years (almost about the time it came out), and have always
found it cheap, reliable, safe and stable (ofcourse its like fire, if you
are not careful it can end up burning you financially).

Regards,
Gourav Sengupta

On Mon, Nov 27, 2017 at 12:58 PM, Alexander Czech <
alexander.czech@googlemail.com> wrote:

> I don't use EMR I spin my clusters up using flintrock (beeing a student my
> budget is slim), my code is writen in pyspark and my data is in the
> us-east-1 region (N. Virginia). I will do my best explaining it with tables:
>
> My input with a size of (10TB) sits in multiple (~150) parquets on S3
>
> +-----------+--------------------------+-------+------+-------+
> |        uri|                 link_list|lang_id|vector|content|
> +-----------+--------------------------+-------+------+-------+
> |www.123.com|[www.123.com,www.abc.com,]|   null|  null|   null|
> |www.abc.com|[www.opq.com,www.456.com,]|   null|  null|   null|
> |www.456.com|[www.xyz.com,www.abc.com,]|   null|  null|   null|
>
>
> *(link_list is a  ArrayType(StringType()))*
>
> Step1 : I only load the uri and link_list columns (but they make up the
> bulk of the data). Then every uri is given a unique ID with df.withColumn('uri_id',
> func.monotonically_increasing_id())
> resulting in a dataframe looking like this
>
> *DF_A*:
>
> +-----------+--------------------------+-------+
> |        uri|                 link_list| uri_id|
> +-----------+--------------------------+-------+
> |www.123.com|[www.123.com,www.abc.com,]|      1|
> |www.abc.com|[www.opq.com,www.456.com,]|      2|
> |www.456.com|[www.xyz.com,www.abc.com,]|      3|
>
> Step 2: I create another dataframe containing only the uri and uri_id which is renamed to link_id fields
>
> *DF_B*:
> +-----------+--------+
> |        uri| link_id|
> +-----------+--------+
> |www.123.com|       1|
> |www.abc.com|       2|
> |www.456.com|       3|
>
> Step 3: Now I exploded the link_list field in *DF_A* with  *DF_A*.select("uri_id", func.explode("link_list").alias("link"))
> This gives me
>
> *DF_C*:
> +-----------+-------+
> |       link| uri_id|
> +-----------+-------+
> |www.123.com|      1|
> |www.abc.com|      1|
> |www.opq.com|      2|
> |www.456.com|      2|
> |www.xyz.com|      3|
> |www.abc.com|      3|
>
>
> Lastly I Join DF_B DF_C *DF_C*.join(*DF_B*, *DF_C*.link==*DF_B*.uri, "left_outer").drop("uri") Which results in the final dataframe:
>
>
> +-----------+-------+--------+
> |       link| uri_id| link_id|
> +-----------+-------+--------+
> |www.123.com|      1|       1|
> |www.abc.com|      1|       2|
> |www.opq.com|      2|    null|
> |www.456.com|      2|       3|
> |www.xyz.com|      3|    null|
> |www.abc.com|      3|       1|
>
> (in code the field link is also dropped but this makes it hopefully more intelligible this way)
>
>
> the rest is to just join the uri_id with the lang_id,vector,content rows that are not null which is trivial.
>
> I hope this makes it more readable. If there is an aws service that makes it easier for me to deal with the data, since it is basically "just" database operations I'm also happy to hear about it.
> I got a few days on my hands until the preprocessing is done but I'm not sure if the explod in step 3 can be done in another aws service.
>
> thanks!
>
>
> On Mon, Nov 27, 2017 at 12:32 PM, Gourav Sengupta <
> gourav.sengupta@gmail.com> wrote:
>
>> Hi,
>>
>> it would be much simpler in case you just provide two tables with the
>> samples of input and output. Going through the verbose text and trying to
>> read and figure out what is happening is a bit daunting.
>>
>> Personally, given that you have your entire data in Parquet, I do not
>> think that you will need to have a large cluster size at all. You can do it
>> with a small size cluster as well, but depending on the cluster size, you
>> might want to create intermediate staging tables or persist the data.
>>
>> Also it will be of help if you could kindly provide the EMR version that
>> you are using.
>>
>>
>> On another note also mention the AWS Region you are in. If Redshift
>> Spectrum is available, or you can use Athena, or you can use Presto, then
>> running massive aggregates over huge data sets at fraction of cost and at
>> least 10x speed may be handy as well.
>>
>> Let me know in case you need any further help.
>>
>> Regards,
>> Gourav
>>
>> On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <
>> alexander.czech@googlemail.com> wrote:
>>
>>> I have a temporary result file ( the 10TB one) that looks like this
>>> I have around 3 billion rows of (url,url_list,language,vector,text).
>>> The bulk of data is in url_list and at the moment I can only guess how
>>> large url_list is. I want to give an ID to every url and then this ID to
>>> every url in url_list to have a ID to ID graph.The columns language,vector
>>> and text only have values for 1% of all rows so they only play a very minor
>>> roll.
>>>
>>> The idea at the moment is to load the URL and URL_list column from the
>>> parquet and give ever row an ID. Then exploded the URL_list and join the
>>> IDs to this on the now exploded rows. After that I drop the URLs from
>>> URL_list column. For the rest of the computation I only load those rows
>>> from the parquet that have values in (language,vector and text) and join
>>> them with the ID table.
>>>
>>> In the end I will create 3 tables:
>>> 1. url, ID
>>> 2. ID, ID
>>> 3. ID,language,vector,text
>>>
>>> Basically there is one very big shuffle going on the rest is not that
>>> heavy. The CPU intense lifting was done before that.
>>>
>>> On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <
>>> alexander.czech@googlemail.com> wrote:
>>>
>>>> I have a temporary result file ( the 10TB one) that looks like this
>>>> I have around 3 billion rows of (url,url_list,language,vector,text).
>>>> The bulk of data is in url_list and at the moment I can only guess how
>>>> large url_list is. I want to give an ID to every url and then this ID to
>>>> every url in url_list to have a ID to ID graph.The columns language,vector
>>>> and text only have values for 1% of all rows so they only play a very minor
>>>> roll.
>>>>
>>>> The idea at the moment is to load the URL and URL_list column from the
>>>> parquet and give ever row an ID. Then exploded the URL_list and join the
>>>> IDs to this on the now exploded rows. After that I drop the URLs from
>>>> URL_list column. For the rest of the computation I only load those rows
>>>> from the parquet that have values in (language,vector and text) and join
>>>> them with the ID table.
>>>>
>>>> In the end I will create 3 tables:
>>>> 1. url, ID
>>>> 2. ID, ID
>>>> 3. ID,language,vector,text
>>>>
>>>> Basically there is one very big shuffle going on the rest is not that
>>>> heavy. The CPU intense lifting was done before that.
>>>>
>>>> On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler <
>>>> georg.kf.heiler@gmail.com> wrote:
>>>>
>>>>> How many columns do you need from the big file?
>>>>>
>>>>> Also how CPU / memory intensive are the computations you want to
>>>>> perform?
>>>>>
>>>>> Alexander Czech <al...@googlemail.com> schrieb am Mo. 27.
>>>>> Nov. 2017 um 10:57:
>>>>>
>>>>>> I want to load a 10TB parquet File from S3 and I'm trying to decide
>>>>>> what EC2 instances to use.
>>>>>>
>>>>>> Should I go for instances that in total have a larger memory size
>>>>>> than 10TB? Or is it enough that they have in total enough SSD storage so
>>>>>> that everything can be spilled to disk?
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Loading a large parquet file how much memory do I need

Posted by Alexander Czech <al...@googlemail.com>.
I don't use EMR I spin my clusters up using flintrock (beeing a student my
budget is slim), my code is writen in pyspark and my data is in the
us-east-1 region (N. Virginia). I will do my best explaining it with tables:

My input with a size of (10TB) sits in multiple (~150) parquets on S3

+-----------+--------------------------+-------+------+-------+
|        uri|                 link_list|lang_id|vector|content|
+-----------+--------------------------+-------+------+-------+
|www.123.com|[www.123.com,www.abc.com,]|   null|  null|   null|
|www.abc.com|[www.opq.com,www.456.com,]|   null|  null|   null|
|www.456.com|[www.xyz.com,www.abc.com,]|   null|  null|   null|


*(link_list is a  ArrayType(StringType()))*

Step1 : I only load the uri and link_list columns (but they make up the
bulk of the data). Then every uri is given a unique ID with
df.withColumn('uri_id',
func.monotonically_increasing_id())
resulting in a dataframe looking like this

*DF_A*:

+-----------+--------------------------+-------+
|        uri|                 link_list| uri_id|
+-----------+--------------------------+-------+
|www.123.com|[www.123.com,www.abc.com,]|      1|
|www.abc.com|[www.opq.com,www.456.com,]|      2|
|www.456.com|[www.xyz.com,www.abc.com,]|      3|

Step 2: I create another dataframe containing only the uri and uri_id
which is renamed to link_id fields

*DF_B*:
+-----------+--------+
|        uri| link_id|
+-----------+--------+
|www.123.com|       1|
|www.abc.com|       2|
|www.456.com|       3|

Step 3: Now I exploded the link_list field in *DF_A* with
*DF_A*.select("uri_id", func.explode("link_list").alias("link"))
This gives me

*DF_C*:
+-----------+-------+
|       link| uri_id|
+-----------+-------+
|www.123.com|      1|
|www.abc.com|      1|
|www.opq.com|      2|
|www.456.com|      2|
|www.xyz.com|      3|
|www.abc.com|      3|


Lastly I Join DF_B DF_C *DF_C*.join(*DF_B*, *DF_C*.link==*DF_B*.uri,
"left_outer").drop("uri") Which results in the final dataframe:


+-----------+-------+--------+
|       link| uri_id| link_id|
+-----------+-------+--------+
|www.123.com|      1|       1|
|www.abc.com|      1|       2|
|www.opq.com|      2|    null|
|www.456.com|      2|       3|
|www.xyz.com|      3|    null|
|www.abc.com|      3|       1|

(in code the field link is also dropped but this makes it hopefully
more intelligible this way)


the rest is to just join the uri_id with the lang_id,vector,content
rows that are not null which is trivial.

I hope this makes it more readable. If there is an aws service that
makes it easier for me to deal with the data, since it is basically
"just" database operations I'm also happy to hear about it.
I got a few days on my hands until the preprocessing is done but I'm
not sure if the explod in step 3 can be done in another aws service.

thanks!


On Mon, Nov 27, 2017 at 12:32 PM, Gourav Sengupta <gourav.sengupta@gmail.com
> wrote:

> Hi,
>
> it would be much simpler in case you just provide two tables with the
> samples of input and output. Going through the verbose text and trying to
> read and figure out what is happening is a bit daunting.
>
> Personally, given that you have your entire data in Parquet, I do not
> think that you will need to have a large cluster size at all. You can do it
> with a small size cluster as well, but depending on the cluster size, you
> might want to create intermediate staging tables or persist the data.
>
> Also it will be of help if you could kindly provide the EMR version that
> you are using.
>
>
> On another note also mention the AWS Region you are in. If Redshift
> Spectrum is available, or you can use Athena, or you can use Presto, then
> running massive aggregates over huge data sets at fraction of cost and at
> least 10x speed may be handy as well.
>
> Let me know in case you need any further help.
>
> Regards,
> Gourav
>
> On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <
> alexander.czech@googlemail.com> wrote:
>
>> I have a temporary result file ( the 10TB one) that looks like this
>> I have around 3 billion rows of (url,url_list,language,vector,text). The
>> bulk of data is in url_list and at the moment I can only guess how large
>> url_list is. I want to give an ID to every url and then this ID to every
>> url in url_list to have a ID to ID graph.The columns language,vector and
>> text only have values for 1% of all rows so they only play a very minor
>> roll.
>>
>> The idea at the moment is to load the URL and URL_list column from the
>> parquet and give ever row an ID. Then exploded the URL_list and join the
>> IDs to this on the now exploded rows. After that I drop the URLs from
>> URL_list column. For the rest of the computation I only load those rows
>> from the parquet that have values in (language,vector and text) and join
>> them with the ID table.
>>
>> In the end I will create 3 tables:
>> 1. url, ID
>> 2. ID, ID
>> 3. ID,language,vector,text
>>
>> Basically there is one very big shuffle going on the rest is not that
>> heavy. The CPU intense lifting was done before that.
>>
>> On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <
>> alexander.czech@googlemail.com> wrote:
>>
>>> I have a temporary result file ( the 10TB one) that looks like this
>>> I have around 3 billion rows of (url,url_list,language,vector,text).
>>> The bulk of data is in url_list and at the moment I can only guess how
>>> large url_list is. I want to give an ID to every url and then this ID to
>>> every url in url_list to have a ID to ID graph.The columns language,vector
>>> and text only have values for 1% of all rows so they only play a very minor
>>> roll.
>>>
>>> The idea at the moment is to load the URL and URL_list column from the
>>> parquet and give ever row an ID. Then exploded the URL_list and join the
>>> IDs to this on the now exploded rows. After that I drop the URLs from
>>> URL_list column. For the rest of the computation I only load those rows
>>> from the parquet that have values in (language,vector and text) and join
>>> them with the ID table.
>>>
>>> In the end I will create 3 tables:
>>> 1. url, ID
>>> 2. ID, ID
>>> 3. ID,language,vector,text
>>>
>>> Basically there is one very big shuffle going on the rest is not that
>>> heavy. The CPU intense lifting was done before that.
>>>
>>> On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler <
>>> georg.kf.heiler@gmail.com> wrote:
>>>
>>>> How many columns do you need from the big file?
>>>>
>>>> Also how CPU / memory intensive are the computations you want to
>>>> perform?
>>>>
>>>> Alexander Czech <al...@googlemail.com> schrieb am Mo. 27.
>>>> Nov. 2017 um 10:57:
>>>>
>>>>> I want to load a 10TB parquet File from S3 and I'm trying to decide
>>>>> what EC2 instances to use.
>>>>>
>>>>> Should I go for instances that in total have a larger memory size than
>>>>> 10TB? Or is it enough that they have in total enough SSD storage so that
>>>>> everything can be spilled to disk?
>>>>>
>>>>> thanks
>>>>>
>>>>
>>>
>>
>

Re: Loading a large parquet file how much memory do I need

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

it would be much simpler in case you just provide two tables with the
samples of input and output. Going through the verbose text and trying to
read and figure out what is happening is a bit daunting.

Personally, given that you have your entire data in Parquet, I do not think
that you will need to have a large cluster size at all. You can do it with
a small size cluster as well, but depending on the cluster size, you might
want to create intermediate staging tables or persist the data.

Also it will be of help if you could kindly provide the EMR version that
you are using.


On another note also mention the AWS Region you are in. If Redshift
Spectrum is available, or you can use Athena, or you can use Presto, then
running massive aggregates over huge data sets at fraction of cost and at
least 10x speed may be handy as well.

Let me know in case you need any further help.

Regards,
Gourav

On Mon, Nov 27, 2017 at 11:05 AM, Alexander Czech <
alexander.czech@googlemail.com> wrote:

> I have a temporary result file ( the 10TB one) that looks like this
> I have around 3 billion rows of (url,url_list,language,vector,text). The
> bulk of data is in url_list and at the moment I can only guess how large
> url_list is. I want to give an ID to every url and then this ID to every
> url in url_list to have a ID to ID graph.The columns language,vector and
> text only have values for 1% of all rows so they only play a very minor
> roll.
>
> The idea at the moment is to load the URL and URL_list column from the
> parquet and give ever row an ID. Then exploded the URL_list and join the
> IDs to this on the now exploded rows. After that I drop the URLs from
> URL_list column. For the rest of the computation I only load those rows
> from the parquet that have values in (language,vector and text) and join
> them with the ID table.
>
> In the end I will create 3 tables:
> 1. url, ID
> 2. ID, ID
> 3. ID,language,vector,text
>
> Basically there is one very big shuffle going on the rest is not that
> heavy. The CPU intense lifting was done before that.
>
> On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <
> alexander.czech@googlemail.com> wrote:
>
>> I have a temporary result file ( the 10TB one) that looks like this
>> I have around 3 billion rows of (url,url_list,language,vector,text). The
>> bulk of data is in url_list and at the moment I can only guess how large
>> url_list is. I want to give an ID to every url and then this ID to every
>> url in url_list to have a ID to ID graph.The columns language,vector and
>> text only have values for 1% of all rows so they only play a very minor
>> roll.
>>
>> The idea at the moment is to load the URL and URL_list column from the
>> parquet and give ever row an ID. Then exploded the URL_list and join the
>> IDs to this on the now exploded rows. After that I drop the URLs from
>> URL_list column. For the rest of the computation I only load those rows
>> from the parquet that have values in (language,vector and text) and join
>> them with the ID table.
>>
>> In the end I will create 3 tables:
>> 1. url, ID
>> 2. ID, ID
>> 3. ID,language,vector,text
>>
>> Basically there is one very big shuffle going on the rest is not that
>> heavy. The CPU intense lifting was done before that.
>>
>> On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler <georg.kf.heiler@gmail.com
>> > wrote:
>>
>>> How many columns do you need from the big file?
>>>
>>> Also how CPU / memory intensive are the computations you want to perform?
>>>
>>> Alexander Czech <al...@googlemail.com> schrieb am Mo. 27.
>>> Nov. 2017 um 10:57:
>>>
>>>> I want to load a 10TB parquet File from S3 and I'm trying to decide
>>>> what EC2 instances to use.
>>>>
>>>> Should I go for instances that in total have a larger memory size than
>>>> 10TB? Or is it enough that they have in total enough SSD storage so that
>>>> everything can be spilled to disk?
>>>>
>>>> thanks
>>>>
>>>
>>
>

Re: Loading a large parquet file how much memory do I need

Posted by Alexander Czech <al...@googlemail.com>.
I have a temporary result file ( the 10TB one) that looks like this
I have around 3 billion rows of (url,url_list,language,vector,text). The
bulk of data is in url_list and at the moment I can only guess how large
url_list is. I want to give an ID to every url and then this ID to every
url in url_list to have a ID to ID graph.The columns language,vector and
text only have values for 1% of all rows so they only play a very minor
roll.

The idea at the moment is to load the URL and URL_list column from the
parquet and give ever row an ID. Then exploded the URL_list and join the
IDs to this on the now exploded rows. After that I drop the URLs from
URL_list column. For the rest of the computation I only load those rows
from the parquet that have values in (language,vector and text) and join
them with the ID table.

In the end I will create 3 tables:
1. url, ID
2. ID, ID
3. ID,language,vector,text

Basically there is one very big shuffle going on the rest is not that
heavy. The CPU intense lifting was done before that.

On Mon, Nov 27, 2017 at 12:03 PM, Alexander Czech <
alexander.czech@googlemail.com> wrote:

> I have a temporary result file ( the 10TB one) that looks like this
> I have around 3 billion rows of (url,url_list,language,vector,text). The
> bulk of data is in url_list and at the moment I can only guess how large
> url_list is. I want to give an ID to every url and then this ID to every
> url in url_list to have a ID to ID graph.The columns language,vector and
> text only have values for 1% of all rows so they only play a very minor
> roll.
>
> The idea at the moment is to load the URL and URL_list column from the
> parquet and give ever row an ID. Then exploded the URL_list and join the
> IDs to this on the now exploded rows. After that I drop the URLs from
> URL_list column. For the rest of the computation I only load those rows
> from the parquet that have values in (language,vector and text) and join
> them with the ID table.
>
> In the end I will create 3 tables:
> 1. url, ID
> 2. ID, ID
> 3. ID,language,vector,text
>
> Basically there is one very big shuffle going on the rest is not that
> heavy. The CPU intense lifting was done before that.
>
> On Mon, Nov 27, 2017 at 11:01 AM, Georg Heiler <ge...@gmail.com>
> wrote:
>
>> How many columns do you need from the big file?
>>
>> Also how CPU / memory intensive are the computations you want to perform?
>>
>> Alexander Czech <al...@googlemail.com> schrieb am Mo. 27. Nov.
>> 2017 um 10:57:
>>
>>> I want to load a 10TB parquet File from S3 and I'm trying to decide what
>>> EC2 instances to use.
>>>
>>> Should I go for instances that in total have a larger memory size than
>>> 10TB? Or is it enough that they have in total enough SSD storage so that
>>> everything can be spilled to disk?
>>>
>>> thanks
>>>
>>
>

Re: Loading a large parquet file how much memory do I need

Posted by Georg Heiler <ge...@gmail.com>.
How many columns do you need from the big file?

Also how CPU / memory intensive are the computations you want to perform?
Alexander Czech <al...@googlemail.com> schrieb am Mo. 27. Nov.
2017 um 10:57:

> I want to load a 10TB parquet File from S3 and I'm trying to decide what
> EC2 instances to use.
>
> Should I go for instances that in total have a larger memory size than
> 10TB? Or is it enough that they have in total enough SSD storage so that
> everything can be spilled to disk?
>
> thanks
>