You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Hao Ren <in...@gmail.com> on 2014/11/05 18:39:51 UTC

Understanding spark operation pipeline and block storage

Hi,

I would like to understand the pipeline of spark's operation(transformation
and action) and some details on block storage.

Let's consider the following code:

val rdd1 = SparkContext.textFile("hdfs://...")
rdd1.map(func1).map(func2).count

For example, we have a file in hdfs about 80Gb, already split in 32 files,
each 2.5Gb.

q1) How many partitions will rdd1 have ? 
rule 1) Maybe 32, since there are 32 split files ? Because, most of the
case, this rule is true if the file is not big in size.
rule 2) Maybe more, I am not sure whether spark's block store can contain a
2.5Gb partition. Is there some parameter specify the block store size ?
AFAIK, hdfs block size is used to read data from hdfs by spark. So there
will be (80Gb/hdfsBlockSize) partitions in rdd1, right ? Usually, the hdfs
block size is 64Mb, then we will have 80g / 64m = 1280 partitions ? Too many
?

Which criterion will it take ? the number of split files or hdfs block size.

q2) Here, func1 and func2 are sequentially added into DAG. What's the
workflow on the partition level ?
option1: Given a partition, func1 and func2 will be applied to each element
in this partition sequentially. After everything is done, we count the # of
line in the partition and send count result to drive. Then, we take the next
partition and do the same thing?
option2: Or else, we apply func1 to all the partitions first, then apply
func2 to all partitions which have applied func1, count # of line in each
partition and send result to driver ?

I have do some tests, it seems that option1 is correct. Can anyone confirm
this ?
So in option 1, we have 1 job "count" which contains 3 stages: map(func1),
map(func2), count.

q3) What if we run out of memory ?

Suppose we have 12 cores, 15Gb memory in cluster.

Case1 :
For example, the func1 will take one line in file, and create an big object
for each line, then the partition applied func1 will become a large
partition. If we have 12 cores in clusters, that means we may have 12 large
partitions in memory. What if these partitions are much bigger than memory ?
What will happen ? an exception OOM / heap size, etc ?

Case2 : 
Suppose the input is 80 GB, but we force RDD to be repartitioned into 6
partitions which is small than the number of core. Normally, each partition
will be send to a core, then all the input will be in memory. However, we
have 15G memory in Cluster. What will happen ? OOM Exception ? 
Then, could we just split the RDD into more partitions so that 80GB /
#partition *12(which is # of cores) < 15Gb(memory size) ? Meanwhile, we can
not split too many, which leads to some overhead on task distribution.

If we read data from hdfs using hdfs block size 64MB as partition size, we
will have a formula like:
64Mb * # of cores < Memory
which in most case is true. Could this explain why we reading hdfs using
block size will not leads to OOM like case 2, even if the data is very big
in size.

Sorry for making this post a bit long. Hope I make myself clear.
Any help on any question will be appreciated.

Thank you.

Hao.













--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Understanding spark operation pipeline and block storage

Posted by Hao Ren <in...@gmail.com>.
Anyone has idea on this ?

Thx



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201p19263.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Understanding spark operation pipeline and block storage

Posted by Hao Ren <in...@gmail.com>.
Hi Cheng,

You are right. =)

I checked your article a few days ago. I have some further questions:

According to the article, the following code takes the spatial complexity
o(1).

val lines = spark.textFile("hdfs://<input>")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split(" ")(1))
messages.saveAsTextFile("hdfs://<output>")

It reads one line from HDFS at a time, apply filter closure, then map
closure on the line

But as far as I know, map, filter, etc are operations based on partitions.
(If I am wrong, correct me please.) So, essentially, it is a partition that
is loaded into memory, a task will process one partition at a time on a cpu
core. And the iterator is based on the partition.

If the partition is too large to fit into memory, we will have a OOM. I read
some posts about OOM, increasing the number of partition is a common
solution.

I am not sure if my understanding is right. Help needed.

Thank you.

Hao





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201p19074.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Understanding spark operation pipeline and block storage

Posted by Cheng Lian <li...@gmail.com>.
On 11/6/14 1:39 AM, Hao Ren wrote:

> Hi,
>
> I would like to understand the pipeline of spark's operation(transformation
> and action) and some details on block storage.
>
> Let's consider the following code:
>
> val rdd1 = SparkContext.textFile("hdfs://...")
> rdd1.map(func1).map(func2).count
>
> For example, we have a file in hdfs about 80Gb, already split in 32 files,
> each 2.5Gb.
>
> q1) How many partitions will rdd1 have ?
> rule 1) Maybe 32, since there are 32 split files ? Because, most of the
> case, this rule is true if the file is not big in size.
> rule 2) Maybe more, I am not sure whether spark's block store can contain a
> 2.5Gb partition. Is there some parameter specify the block store size ?
> AFAIK, hdfs block size is used to read data from hdfs by spark. So there
> will be (80Gb/hdfsBlockSize) partitions in rdd1, right ? Usually, the hdfs
> block size is 64Mb, then we will have 80g / 64m = 1280 partitions ? Too many
> ?
>
> Which criterion will it take ? the number of split files or hdfs block size.

Rule 2 applies here. |textFile| delegates to |hadoopFile|, which 
respects HDFS block size. You may use |RDD.coalesce(n)| to reduce 
partition number if that’s too large for you.

> q2) Here, func1 and func2 are sequentially added into DAG. What's the
> workflow on the partition level ?
> option1: Given a partition, func1 and func2 will be applied to each element
> in this partition sequentially. After everything is done, we count the # of
> line in the partition and send count result to drive. Then, we take the next
> partition and do the same thing?
> option2: Or else, we apply func1 to all the partitions first, then apply
> func2 to all partitions which have applied func1, count # of line in each
> partition and send result to driver ?
>
> I have do some tests, it seems that option1 is correct. Can anyone confirm
> this ?
> So in option 1, we have 1 job "count" which contains 3 stages: map(func1),
> map(func2), count.

Option 1 is correct, however here we only have 1 stage. A stage is only 
introduced when transformations with wide dependencies are used (e.g. 
|reduceByKey|).

> q3) What if we run out of memory ?
>
> Suppose we have 12 cores, 15Gb memory in cluster.
>
> Case1 :
> For example, the func1 will take one line in file, and create an big object
> for each line, then the partition applied func1 will become a large
> partition. If we have 12 cores in clusters, that means we may have 12 large
> partitions in memory. What if these partitions are much bigger than memory ?
> What will happen ? an exception OOM / heap size, etc ?
>
> Case2 :
> Suppose the input is 80 GB, but we force RDD to be repartitioned into 6
> partitions which is small than the number of core. Normally, each partition
> will be send to a core, then all the input will be in memory. However, we
> have 15G memory in Cluster. What will happen ? OOM Exception ?
> Then, could we just split the RDD into more partitions so that 80GB /
> #partition *12(which is # of cores) < 15Gb(memory size) ? Meanwhile, we can
> not split too many, which leads to some overhead on task distribution.
>
> If we read data from hdfs using hdfs block size 64MB as partition size, we
> will have a formula like:
> 64Mb * # of cores < Memory
> which in most case is true. Could this explain why we reading hdfs using
> block size will not leads to OOM like case 2, even if the data is very big
> in size.

Wrote an article about this months ago 
http://www.zhihu.com/question/23079001/answer/23569986

The article is in Chinese. I guess you’re Chinese from your name, 
apologize if I’m wrong :)

> Sorry for making this post a bit long. Hope I make myself clear.
> Any help on any question will be appreciated.
>
> Thank you.
>
> Hao.
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
​

Re: Understanding spark operation pipeline and block storage

Posted by Hao Ren <in...@gmail.com>.
Hey, guys

Feel free to ask for more details if my questions are not clear.

Any insight here ?





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201p18496.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org