You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by bethesda <sw...@mac.com> on 2014/12/16 13:51:53 UTC

Why so many tasks?

Our job is creating what appears to be an inordinate number of very small
tasks, which blow out our os inode and file limits.  Rather than continually
upping those limits, we are seeking to understand whether our real problem
is that too many tasks are running, perhaps because we are mis-configured or
we are coding incorrectly.

Rather than posting our actual code I have re-created the essence of the
matter in the shell with a directory of files simulating the data we deal
with.  We have three servers, each with 8G RAM.

Given 1,000 files, each containing a string of 100 characters, in the
myfiles directory:

val data = sc.textFile("/user/foo/myfiles/*")

val c = data.count

The count operation produces 1,000 tasks.  Is this normal?

val cart = data.cartesian(data)
cart.count

The cartesian operation produces 1M tasks.  I understand that the cartesian
product of 1,000 items against itself is 1M, however, it seems the overhead
of all this task creation and file I/O of all these tiny files outweighs the
gains of distributed computing.  What am I missing here?

Below is the truncated output of the count operation, if this helps indicate
a configuration problem.

Thank you.

scala> data.count
14/12/16 07:40:46 INFO FileInputFormat: Total input paths to process : 1000
14/12/16 07:40:47 INFO SparkContext: Starting job: count at <console>:15
14/12/16 07:40:47 INFO DAGScheduler: Got job 0 (count at <console>:15) with
1000 output partitions (allowLocal=false)
14/12/16 07:40:47 INFO DAGScheduler: Final stage: Stage 0(count at
<console>:15)
14/12/16 07:40:47 INFO DAGScheduler: Parents of final stage: List()
14/12/16 07:40:47 INFO DAGScheduler: Missing parents: List()
14/12/16 07:40:47 INFO DAGScheduler: Submitting Stage 0
(/user/ds/randomfiles/* MappedRDD[3] at textFile at <console>:12), which has
no missing parents
14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(2400) called with
curMem=507154, maxMem=278019440
14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 2.3 KB, free 264.7 MB)
14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(1813) called with
curMem=509554, maxMem=278019440
14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes
in memory (estimated size 1813.0 B, free 264.7 MB)
14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on dev-myserver-1.abc.cloud:44041 (size: 1813.0 B, free: 265.1 MB)
14/12/16 07:40:47 INFO BlockManagerMaster: Updated info of block
broadcast_2_piece0
14/12/16 07:40:47 INFO DAGScheduler: Submitting 1000 missing tasks from
Stage 0 (/user/ds/randomfiles/* MappedRDD[3] at textFile at <console>:12)
14/12/16 07:40:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1000
tasks
14/12/16 07:40:47 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
0, dev-myserver-2.abc.cloud, NODE_LOCAL, 1202 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
1, dev-myserver-3.abc.cloud, NODE_LOCAL, 1201 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
2, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
3, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
4, dev-myserver-3.abc.cloud, NODE_LOCAL, 1204 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
5, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
[dev-myserver-3.abc.cloud/10.40.13.192:36133]
14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
[dev-myserver-2.abc.cloud/10.40.13.195:35716]
14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
[dev-myserver-1.abc.cloud/10.40.13.194:33728]
14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
[dev-myserver-1.abc.cloud/10.40.13.194:49458]
14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
[dev-myserver-3.abc.cloud/10.40.13.192:58579]
14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
[dev-myserver-2.abc.cloud/10.40.13.195:52502]
14/12/16 07:40:47 INFO SendingConnection: Connected to
[dev-myserver-3.abc.cloud/10.40.13.192:58579], 1 messages pending
14/12/16 07:40:47 INFO SendingConnection: Connected to
[dev-myserver-1.abc.cloud/10.40.13.194:49458], 1 messages pending
14/12/16 07:40:47 INFO SendingConnection: Connected to
[dev-myserver-2.abc.cloud/10.40.13.195:52502], 1 messages pending
14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on dev-myserver-1.abc.cloud:49458 (size: 1813.0 B, free: 1060.0 MB)
14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on dev-myserver-3.abc.cloud:58579 (size: 1813.0 B, free: 1060.0 MB)
14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on dev-myserver-2.abc.cloud:52502 (size: 1813.0 B, free: 1060.0 MB)
14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
on dev-myserver-3.abc.cloud:58579 (size: 32.5 KB, free: 1060.0 MB)
14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
on dev-myserver-2.abc.cloud:52502 (size: 32.5 KB, free: 1060.0 MB)
14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
on dev-myserver-1.abc.cloud:49458 (size: 32.5 KB, free: 1060.0 MB)
14/12/16 07:40:49 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID
6, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:49 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID
7, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:49 INFO TaskSetManager: Starting task 19.0 in stage 0.0 (TID
8, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:49 INFO TaskSetManager: Starting task 23.0 in stage 0.0 (TID
9, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:49 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID
10, dev-myserver-3.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:49 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID
11, dev-myserver-3.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:49 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID
2) in 1964 ms on dev-myserver-1.abc.cloud (1/1000)
14/12/16 07:40:49 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
0) in 2000 ms on dev-myserver-2.abc.cloud (2/1000)
14/12/16 07:40:49 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID
5) in 1975 ms on dev-myserver-1.abc.cloud (3/1000)
....





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-so-many-tasks-tp20712.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Why so many tasks?

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Try to repartition the data like:

val data = sc.textFile("/user/foo/myfiles/*").repartition(100)


​Since the file size is small it shouldn't be a problem.​


Thanks
Best Regards

On Tue, Dec 16, 2014 at 6:21 PM, bethesda <sw...@mac.com> wrote:
>
> Our job is creating what appears to be an inordinate number of very small
> tasks, which blow out our os inode and file limits.  Rather than
> continually
> upping those limits, we are seeking to understand whether our real problem
> is that too many tasks are running, perhaps because we are mis-configured
> or
> we are coding incorrectly.
>
> Rather than posting our actual code I have re-created the essence of the
> matter in the shell with a directory of files simulating the data we deal
> with.  We have three servers, each with 8G RAM.
>
> Given 1,000 files, each containing a string of 100 characters, in the
> myfiles directory:
>
> val data = sc.textFile("/user/foo/myfiles/*")
>
> val c = data.count
>
> The count operation produces 1,000 tasks.  Is this normal?
>
> val cart = data.cartesian(data)
> cart.count
>
> The cartesian operation produces 1M tasks.  I understand that the cartesian
> product of 1,000 items against itself is 1M, however, it seems the overhead
> of all this task creation and file I/O of all these tiny files outweighs
> the
> gains of distributed computing.  What am I missing here?
>
> Below is the truncated output of the count operation, if this helps
> indicate
> a configuration problem.
>
> Thank you.
>
> scala> data.count
> 14/12/16 07:40:46 INFO FileInputFormat: Total input paths to process : 1000
> 14/12/16 07:40:47 INFO SparkContext: Starting job: count at <console>:15
> 14/12/16 07:40:47 INFO DAGScheduler: Got job 0 (count at <console>:15) with
> 1000 output partitions (allowLocal=false)
> 14/12/16 07:40:47 INFO DAGScheduler: Final stage: Stage 0(count at
> <console>:15)
> 14/12/16 07:40:47 INFO DAGScheduler: Parents of final stage: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Missing parents: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting Stage 0
> (/user/ds/randomfiles/* MappedRDD[3] at textFile at <console>:12), which
> has
> no missing parents
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(2400) called with
> curMem=507154, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2 stored as values in
> memory (estimated size 2.3 KB, free 264.7 MB)
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(1813) called with
> curMem=509554, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2_piece0 stored as
> bytes
> in memory (estimated size 1813.0 B, free 264.7 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-1.abc.cloud:44041 (size: 1813.0 B, free: 265.1 MB)
> 14/12/16 07:40:47 INFO BlockManagerMaster: Updated info of block
> broadcast_2_piece0
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting 1000 missing tasks from
> Stage 0 (/user/ds/randomfiles/* MappedRDD[3] at textFile at <console>:12)
> 14/12/16 07:40:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1000
> tasks
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 0, dev-myserver-2.abc.cloud, NODE_LOCAL, 1202 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 1, dev-myserver-3.abc.cloud, NODE_LOCAL, 1201 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
> 2, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
> 3, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
> 4, dev-myserver-3.abc.cloud, NODE_LOCAL, 1204 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
> 5, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-3.abc.cloud/10.40.13.192:36133]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-2.abc.cloud/10.40.13.195:35716]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-1.abc.cloud/10.40.13.194:33728]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-2.abc.cloud/10.40.13.195:52502]
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-2.abc.cloud/10.40.13.195:52502], 1 messages pending
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-1.abc.cloud:49458 (size: 1813.0 B, free: 1060.0 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-3.abc.cloud:58579 (size: 1813.0 B, free: 1060.0 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-2.abc.cloud:52502 (size: 1813.0 B, free: 1060.0 MB)
> 14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
> on dev-myserver-3.abc.cloud:58579 (size: 32.5 KB, free: 1060.0 MB)
> 14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
> on dev-myserver-2.abc.cloud:52502 (size: 32.5 KB, free: 1060.0 MB)
> 14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
> on dev-myserver-1.abc.cloud:49458 (size: 32.5 KB, free: 1060.0 MB)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID
> 6, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID
> 7, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 19.0 in stage 0.0 (TID
> 8, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 23.0 in stage 0.0 (TID
> 9, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID
> 10, dev-myserver-3.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID
> 11, dev-myserver-3.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID
> 2) in 1964 ms on dev-myserver-1.abc.cloud (1/1000)
> 14/12/16 07:40:49 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
> 0) in 2000 ms on dev-myserver-2.abc.cloud (2/1000)
> 14/12/16 07:40:49 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID
> 5) in 1975 ms on dev-myserver-1.abc.cloud (3/1000)
> ....
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-so-many-tasks-tp20712.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Why so many tasks?

Posted by Ashish Rangole <ar...@gmail.com>.
Take a look at combine file input format. Repartition or coalesce could
introduce shuffle I/O overhead.
On Dec 16, 2014 7:09 AM, "bethesda" <sw...@mac.com> wrote:

> Thank you!  I had known about the small-files problem in HDFS but didn't
> realize that it affected sc.textFile().
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-so-many-tasks-tp20712p20717.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Why so many tasks?

Posted by bethesda <sw...@mac.com>.
Thank you!  I had known about the small-files problem in HDFS but didn't
realize that it affected sc.textFile().



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-so-many-tasks-tp20712p20717.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Why so many tasks?

Posted by Gen <ge...@gmail.com>.
Hi,

As you have 1,000 files, the RDD created by textFile will have 1,000
partitions. It is normal. In fact, as the same principal of HDFS, it is
better to store data with smaller number of files but larger size file. 

You can use data.coalesce(10) to solve this problem(it reduce the number of
partitions). 

Cheers
Gen



bethesda wrote
> Our job is creating what appears to be an inordinate number of very small
> tasks, which blow out our os inode and file limits.  Rather than
> continually upping those limits, we are seeking to understand whether our
> real problem is that too many tasks are running, perhaps because we are
> mis-configured or we are coding incorrectly.
> 
> Rather than posting our actual code I have re-created the essence of the
> matter in the shell with a directory of files simulating the data we deal
> with.  We have three servers, each with 8G RAM.
> 
> Given 1,000 files, each containing a string of 100 characters, in the
> myfiles directory:
> 
> val data = sc.textFile("/user/foo/myfiles/*")
> 
> val c = data.count
> 
> The count operation produces 1,000 tasks.  Is this normal?
> 
> val cart = data.cartesian(data)
> cart.count
> 
> The cartesian operation produces 1M tasks.  I understand that the
> cartesian product of 1,000 items against itself is 1M, however, it seems
> the overhead of all this task creation and file I/O of all these tiny
> files outweighs the gains of distributed computing.  What am I missing
> here?
> 
> Below is the truncated output of the count operation, if this helps
> indicate a configuration problem.
> 
> Thank you.
> 
> scala> data.count
> 14/12/16 07:40:46 INFO FileInputFormat: Total input paths to process :
> 1000
> 14/12/16 07:40:47 INFO SparkContext: Starting job: count at 
> <console>
> :15
> 14/12/16 07:40:47 INFO DAGScheduler: Got job 0 (count at 
> <console>
> :15) with 1000 output partitions (allowLocal=false)
> 14/12/16 07:40:47 INFO DAGScheduler: Final stage: Stage 0(count at 
> <console>
> :15)
> 14/12/16 07:40:47 INFO DAGScheduler: Parents of final stage: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Missing parents: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting Stage 0
> (/user/ds/randomfiles/* MappedRDD[3] at textFile at 
> <console>
> :12), which has no missing parents
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(2400) called with
> curMem=507154, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2 stored as values in
> memory (estimated size 2.3 KB, free 264.7 MB)
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(1813) called with
> curMem=509554, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2_piece0 stored as
> bytes in memory (estimated size 1813.0 B, free 264.7 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in
> memory on dev-myserver-1.abc.cloud:44041 (size: 1813.0 B, free: 265.1 MB)
> 14/12/16 07:40:47 INFO BlockManagerMaster: Updated info of block
> broadcast_2_piece0
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting 1000 missing tasks from
> Stage 0 (/user/ds/randomfiles/* MappedRDD[3] at textFile at 
> <console>
> :12)
> 14/12/16 07:40:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1000
> tasks
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 0, dev-myserver-2.abc.cloud, NODE_LOCAL, 1202 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 1, dev-myserver-3.abc.cloud, NODE_LOCAL, 1201 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
> 2, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
> 3, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
> 4, dev-myserver-3.abc.cloud, NODE_LOCAL, 1204 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
> 5, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-3.abc.cloud/10.40.13.192:36133]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-2.abc.cloud/10.40.13.195:35716]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-1.abc.cloud/10.40.13.194:33728]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-2.abc.cloud/10.40.13.195:52502]
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-2.abc.cloud/10.40.13.195:52502], 1 messages pending
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in
> memory on dev-myserver-1.abc.cloud:49458 (size: 1813.0 B, free: 1060.0 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in
> memory on dev-myserver-3.abc.cloud:58579 (size: 1813.0 B, free: 1060.0 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in
> memory on dev-myserver-2.abc.cloud:52502 (size: 1813.0 B, free: 1060.0 MB)
> 14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on dev-myserver-3.abc.cloud:58579 (size: 32.5 KB, free: 1060.0 MB)
> 14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on dev-myserver-2.abc.cloud:52502 (size: 32.5 KB, free: 1060.0 MB)
> 14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in
> memory on dev-myserver-1.abc.cloud:49458 (size: 32.5 KB, free: 1060.0 MB)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID
> 6, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID
> 7, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 19.0 in stage 0.0
> (TID 8, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 23.0 in stage 0.0
> (TID 9, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID
> 10, dev-myserver-3.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID
> 11, dev-myserver-3.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID
> 2) in 1964 ms on dev-myserver-1.abc.cloud (1/1000)
> 14/12/16 07:40:49 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
> 0) in 2000 ms on dev-myserver-2.abc.cloud (2/1000)
> 14/12/16 07:40:49 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID
> 5) in 1975 ms on dev-myserver-1.abc.cloud (3/1000)
> ....





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-so-many-tasks-tp20712p20715.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Why so many tasks?

Posted by Gerard Maas <ge...@gmail.com>.
Creating an RDD from a wildcard like this:
val data = sc.textFile("/user/foo/myfiles/*")

Will create 1 partition for each file found. 1000 files = 1000 partitions.
A task is a job stage (defined as a sequence of transformations) applied to
a partition, so 1000 partitions = 1000 tasks per stage.

You can reduce the amount of partitions at any time with rdd.coalesce:
val coalescedRDD = data.coalesce (10)  // 10 partitions

-kr, Gerard.
@maasg




On Tue, Dec 16, 2014 at 1:51 PM, bethesda <sw...@mac.com> wrote:
>
> Our job is creating what appears to be an inordinate number of very small
> tasks, which blow out our os inode and file limits.  Rather than
> continually
> upping those limits, we are seeking to understand whether our real problem
> is that too many tasks are running, perhaps because we are mis-configured
> or
> we are coding incorrectly.
>
> Rather than posting our actual code I have re-created the essence of the
> matter in the shell with a directory of files simulating the data we deal
> with.  We have three servers, each with 8G RAM.
>
> Given 1,000 files, each containing a string of 100 characters, in the
> myfiles directory:
>
> val data = sc.textFile("/user/foo/myfiles/*")
>
> val c = data.count
>
> The count operation produces 1,000 tasks.  Is this normal?
>
> val cart = data.cartesian(data)
> cart.count
>
> The cartesian operation produces 1M tasks.  I understand that the cartesian
> product of 1,000 items against itself is 1M, however, it seems the overhead
> of all this task creation and file I/O of all these tiny files outweighs
> the
> gains of distributed computing.  What am I missing here?
>
> Below is the truncated output of the count operation, if this helps
> indicate
> a configuration problem.
>
> Thank you.
>
> scala> data.count
> 14/12/16 07:40:46 INFO FileInputFormat: Total input paths to process : 1000
> 14/12/16 07:40:47 INFO SparkContext: Starting job: count at <console>:15
> 14/12/16 07:40:47 INFO DAGScheduler: Got job 0 (count at <console>:15) with
> 1000 output partitions (allowLocal=false)
> 14/12/16 07:40:47 INFO DAGScheduler: Final stage: Stage 0(count at
> <console>:15)
> 14/12/16 07:40:47 INFO DAGScheduler: Parents of final stage: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Missing parents: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting Stage 0
> (/user/ds/randomfiles/* MappedRDD[3] at textFile at <console>:12), which
> has
> no missing parents
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(2400) called with
> curMem=507154, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2 stored as values in
> memory (estimated size 2.3 KB, free 264.7 MB)
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(1813) called with
> curMem=509554, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2_piece0 stored as
> bytes
> in memory (estimated size 1813.0 B, free 264.7 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-1.abc.cloud:44041 (size: 1813.0 B, free: 265.1 MB)
> 14/12/16 07:40:47 INFO BlockManagerMaster: Updated info of block
> broadcast_2_piece0
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting 1000 missing tasks from
> Stage 0 (/user/ds/randomfiles/* MappedRDD[3] at textFile at <console>:12)
> 14/12/16 07:40:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1000
> tasks
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 0, dev-myserver-2.abc.cloud, NODE_LOCAL, 1202 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 1, dev-myserver-3.abc.cloud, NODE_LOCAL, 1201 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
> 2, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
> 3, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
> 4, dev-myserver-3.abc.cloud, NODE_LOCAL, 1204 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
> 5, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-3.abc.cloud/10.40.13.192:36133]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-2.abc.cloud/10.40.13.195:35716]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-1.abc.cloud/10.40.13.194:33728]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-2.abc.cloud/10.40.13.195:52502]
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-2.abc.cloud/10.40.13.195:52502], 1 messages pending
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-1.abc.cloud:49458 (size: 1813.0 B, free: 1060.0 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-3.abc.cloud:58579 (size: 1813.0 B, free: 1060.0 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-2.abc.cloud:52502 (size: 1813.0 B, free: 1060.0 MB)
> 14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
> on dev-myserver-3.abc.cloud:58579 (size: 32.5 KB, free: 1060.0 MB)
> 14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
> on dev-myserver-2.abc.cloud:52502 (size: 32.5 KB, free: 1060.0 MB)
> 14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
> on dev-myserver-1.abc.cloud:49458 (size: 32.5 KB, free: 1060.0 MB)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID
> 6, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID
> 7, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 19.0 in stage 0.0 (TID
> 8, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 23.0 in stage 0.0 (TID
> 9, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID
> 10, dev-myserver-3.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID
> 11, dev-myserver-3.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID
> 2) in 1964 ms on dev-myserver-1.abc.cloud (1/1000)
> 14/12/16 07:40:49 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
> 0) in 2000 ms on dev-myserver-2.abc.cloud (2/1000)
> 14/12/16 07:40:49 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID
> 5) in 1975 ms on dev-myserver-1.abc.cloud (3/1000)
> ....
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-so-many-tasks-tp20712.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Why so many tasks?

Posted by Koert Kuipers <ko...@tresata.com>.
sc.textFile uses a hadoop input format. hadoop input formats by default
create one task per file, and they are not very suitable for many very
small files. can you turns your 1000 files into one larger text file?

otherwise maybe try:
val data = sc.textFile("/user/foo/myfiles/*").coalesce(100)

On Tue, Dec 16, 2014 at 7:51 AM, bethesda <sw...@mac.com> wrote:
>
> Our job is creating what appears to be an inordinate number of very small
> tasks, which blow out our os inode and file limits.  Rather than
> continually
> upping those limits, we are seeking to understand whether our real problem
> is that too many tasks are running, perhaps because we are mis-configured
> or
> we are coding incorrectly.
>
> Rather than posting our actual code I have re-created the essence of the
> matter in the shell with a directory of files simulating the data we deal
> with.  We have three servers, each with 8G RAM.
>
> Given 1,000 files, each containing a string of 100 characters, in the
> myfiles directory:
>
> val data = sc.textFile("/user/foo/myfiles/*")
>
> val c = data.count
>
> The count operation produces 1,000 tasks.  Is this normal?
>
> val cart = data.cartesian(data)
> cart.count
>
> The cartesian operation produces 1M tasks.  I understand that the cartesian
> product of 1,000 items against itself is 1M, however, it seems the overhead
> of all this task creation and file I/O of all these tiny files outweighs
> the
> gains of distributed computing.  What am I missing here?
>
> Below is the truncated output of the count operation, if this helps
> indicate
> a configuration problem.
>
> Thank you.
>
> scala> data.count
> 14/12/16 07:40:46 INFO FileInputFormat: Total input paths to process : 1000
> 14/12/16 07:40:47 INFO SparkContext: Starting job: count at <console>:15
> 14/12/16 07:40:47 INFO DAGScheduler: Got job 0 (count at <console>:15) with
> 1000 output partitions (allowLocal=false)
> 14/12/16 07:40:47 INFO DAGScheduler: Final stage: Stage 0(count at
> <console>:15)
> 14/12/16 07:40:47 INFO DAGScheduler: Parents of final stage: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Missing parents: List()
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting Stage 0
> (/user/ds/randomfiles/* MappedRDD[3] at textFile at <console>:12), which
> has
> no missing parents
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(2400) called with
> curMem=507154, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2 stored as values in
> memory (estimated size 2.3 KB, free 264.7 MB)
> 14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(1813) called with
> curMem=509554, maxMem=278019440
> 14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2_piece0 stored as
> bytes
> in memory (estimated size 1813.0 B, free 264.7 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-1.abc.cloud:44041 (size: 1813.0 B, free: 265.1 MB)
> 14/12/16 07:40:47 INFO BlockManagerMaster: Updated info of block
> broadcast_2_piece0
> 14/12/16 07:40:47 INFO DAGScheduler: Submitting 1000 missing tasks from
> Stage 0 (/user/ds/randomfiles/* MappedRDD[3] at textFile at <console>:12)
> 14/12/16 07:40:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1000
> tasks
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
> 0, dev-myserver-2.abc.cloud, NODE_LOCAL, 1202 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
> 1, dev-myserver-3.abc.cloud, NODE_LOCAL, 1201 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
> 2, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
> 3, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
> 4, dev-myserver-3.abc.cloud, NODE_LOCAL, 1204 bytes)
> 14/12/16 07:40:47 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
> 5, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-3.abc.cloud/10.40.13.192:36133]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-2.abc.cloud/10.40.13.195:35716]
> 14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
> [dev-myserver-1.abc.cloud/10.40.13.194:33728]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579]
> 14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
> [dev-myserver-2.abc.cloud/10.40.13.195:52502]
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-3.abc.cloud/10.40.13.192:58579], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-1.abc.cloud/10.40.13.194:49458], 1 messages pending
> 14/12/16 07:40:47 INFO SendingConnection: Connected to
> [dev-myserver-2.abc.cloud/10.40.13.195:52502], 1 messages pending
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-1.abc.cloud:49458 (size: 1813.0 B, free: 1060.0 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-3.abc.cloud:58579 (size: 1813.0 B, free: 1060.0 MB)
> 14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on dev-myserver-2.abc.cloud:52502 (size: 1813.0 B, free: 1060.0 MB)
> 14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
> on dev-myserver-3.abc.cloud:58579 (size: 32.5 KB, free: 1060.0 MB)
> 14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
> on dev-myserver-2.abc.cloud:52502 (size: 32.5 KB, free: 1060.0 MB)
> 14/12/16 07:40:48 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory
> on dev-myserver-1.abc.cloud:49458 (size: 32.5 KB, free: 1060.0 MB)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID
> 6, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID
> 7, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 19.0 in stage 0.0 (TID
> 8, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 23.0 in stage 0.0 (TID
> 9, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID
> 10, dev-myserver-3.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID
> 11, dev-myserver-3.abc.cloud, NODE_LOCAL, 1203 bytes)
> 14/12/16 07:40:49 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID
> 2) in 1964 ms on dev-myserver-1.abc.cloud (1/1000)
> 14/12/16 07:40:49 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
> 0) in 2000 ms on dev-myserver-2.abc.cloud (2/1000)
> 14/12/16 07:40:49 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID
> 5) in 1975 ms on dev-myserver-1.abc.cloud (3/1000)
> ....
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-so-many-tasks-tp20712.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>