You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by myasuka <my...@live.com> on 2014/09/28 05:44:02 UTC

How to use multi thread in RDD map function ?

Hi, everyone
    I come across with a problem about increasing the concurency. In a
program, after shuffle write, each node should fetch 16 pair matrices to do
matrix multiplication. such as:

*
import breeze.linalg.{DenseMatrix => BDM}

pairs.map(t => {
        val b1 = t._2._1.asInstanceOf[BDM[Double]]
        val b2 = t._2._2.asInstanceOf[BDM[Double]]
      
        val c = (b1 * b2).asInstanceOf[BDM[Double]]

        (new BlockID(t._1.row, t._1.column), c)
      })*
 
    Each node has 16 cores. However, no matter I set 16 tasks or more on
each node, the concurrency cannot be higher than 60%, which means not every
core on the node is computing. Then I check the running log on the WebUI,
according to the amount of shuffle read and write in every task, I see some
task do once matrix multiplication, some do twice while some do none.

    Thus, I think of using java multi thread to increase the concurrency. I
wrote a program in scala which calls java multi thread without Spark on a
single node, by watch the 'top' monitor, I find this program can use CPU up
to 1500% ( means nearly every core are computing). But I have no idea how to
use Java multi thread in RDD transformation.

    Is there any one can provide some example code to use Java multi thread
in RDD transformation, or give any idea to increase the concurrency ?

Thanks for all





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-multi-thread-in-RDD-map-function-tp15286.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to use multi thread in RDD map function ?

Posted by myasuka <my...@live.com>.
Thank you for your reply,

   Actually, we have already used this parameter. Our cluster is a
standalone cluster with 16 nodes, every node has 16 cores. We have 256 pairs
matrices along with 256 tasks , when we set --total-executor-cores as 64,
each node can launch 4 tasks simultaneously, each task can do once matrix
multiplication, CPU usage is nearly 25%. If we set --total-executor-cores as
128, each node can launch 8 tasks simultaneously, but not every task do once
matrix multiplication, CPU usage is nearly 35%. Then if we set
--total-executor-cores as 256, each node can launch 16 tasks simultaneously,
but still not every task do once matrix multiplication, some do none some do
twice, CPU usage is nearly 50%.

   If we can increase the concurency to increase the CPU usage, thus less
running time we will cost.

   Hope for any solution!


Qin Wei wrote
> in the options of spark-submit, there are two options which may be helpful
> to your problem, they are "--total-executor-cores NUM"(standalone and
> mesos only), "--executor-cores"(yarn only)
> 
> 
> qinwei
>  From: myasukaDate: 2014-09-28 11:44To: userSubject: How to use multi
> thread in RDD map function ?Hi, everyone
>     I come across with a problem about increasing the concurency. In a
> program, after shuffle write, each node should fetch 16 pair matrices to
> do
> matrix multiplication. such as:
>  
> *
> import breeze.linalg.{DenseMatrix => BDM}
>  
> pairs.map(t => {
>         val b1 = t._2._1.asInstanceOf[BDM[Double]]
>         val b2 = t._2._2.asInstanceOf[BDM[Double]]
>       
>         val c = (b1 * b2).asInstanceOf[BDM[Double]]
>  
>         (new BlockID(t._1.row, t._1.column), c)
>       })*
>  
>     Each node has 16 cores. However, no matter I set 16 tasks or more on
> each node, the concurrency cannot be higher than 60%, which means not
> every
> core on the node is computing. Then I check the running log on the WebUI,
> according to the amount of shuffle read and write in every task, I see
> some
> task do once matrix multiplication, some do twice while some do none.
>  
>     Thus, I think of using java multi thread to increase the concurrency.
> I
> wrote a program in scala which calls java multi thread without Spark on a
> single node, by watch the 'top' monitor, I find this program can use CPU
> up
> to 1500% ( means nearly every core are computing). But I have no idea how
> to
> use Java multi thread in RDD transformation.
>  
>     Is there any one can provide some example code to use Java multi
> thread
> in RDD transformation, or give any idea to increase the concurrency ?
>  
> Thanks for all
>  
>  
>  
>  
>  
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-multi-thread-in-RDD-map-function-tp15286.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>  
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: 

> user-unsubscribe@.apache

> For additional commands, e-mail: 

> user-help@.apache

>  





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-multi-thread-in-RDD-map-function-tp15286p15290.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: How to use multi thread in RDD map function ?

Posted by qinwei <we...@dewmobile.net>.





in the options of spark-submit, there are two options which may be helpful to your problem, they are "--total-executor-cores NUM"(standalone and mesos only), "--executor-cores"(yarn only)


qinwei
 From: myasukaDate: 2014-09-28 11:44To: userSubject: How to use multi thread in RDD map function ?Hi, everyone
    I come across with a problem about increasing the concurency. In a
program, after shuffle write, each node should fetch 16 pair matrices to do
matrix multiplication. such as:
 
*
import breeze.linalg.{DenseMatrix => BDM}
 
pairs.map(t => {
        val b1 = t._2._1.asInstanceOf[BDM[Double]]
        val b2 = t._2._2.asInstanceOf[BDM[Double]]
      
        val c = (b1 * b2).asInstanceOf[BDM[Double]]
 
        (new BlockID(t._1.row, t._1.column), c)
      })*
 
    Each node has 16 cores. However, no matter I set 16 tasks or more on
each node, the concurrency cannot be higher than 60%, which means not every
core on the node is computing. Then I check the running log on the WebUI,
according to the amount of shuffle read and write in every task, I see some
task do once matrix multiplication, some do twice while some do none.
 
    Thus, I think of using java multi thread to increase the concurrency. I
wrote a program in scala which calls java multi thread without Spark on a
single node, by watch the 'top' monitor, I find this program can use CPU up
to 1500% ( means nearly every core are computing). But I have no idea how to
use Java multi thread in RDD transformation.
 
    Is there any one can provide some example code to use Java multi thread
in RDD transformation, or give any idea to increase the concurrency ?
 
Thanks for all
 
 
 
 
 
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-multi-thread-in-RDD-map-function-tp15286.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
 


Re: How to use multi thread in RDD map function ?

Posted by Sean Owen <so...@cloudera.com>.
If increasing executors really isn't enough, then you can consider using
mapPartitions to process whole partitions at a time. Within that you can
multi thread your processing of the elements in the partition. (And you
should probably use more like one worker per machine then.)

The question is how to parallelize. If you can tolerate the input and
output being in memory, then you can make the Iterator into a parallel
collection and trivially map it in parallel locally.  Otherwise you can
look at Future.traverse to iterate it in parallel instead but I have not
tried it.
On Sep 28, 2014 4:44 AM, "myasuka" <my...@live.com> wrote:

> Hi, everyone
>     I come across with a problem about increasing the concurency. In a
> program, after shuffle write, each node should fetch 16 pair matrices to do
> matrix multiplication. such as:
>
> *
> import breeze.linalg.{DenseMatrix => BDM}
>
> pairs.map(t => {
>         val b1 = t._2._1.asInstanceOf[BDM[Double]]
>         val b2 = t._2._2.asInstanceOf[BDM[Double]]
>
>         val c = (b1 * b2).asInstanceOf[BDM[Double]]
>
>         (new BlockID(t._1.row, t._1.column), c)
>       })*
>
>     Each node has 16 cores. However, no matter I set 16 tasks or more on
> each node, the concurrency cannot be higher than 60%, which means not every
> core on the node is computing. Then I check the running log on the WebUI,
> according to the amount of shuffle read and write in every task, I see some
> task do once matrix multiplication, some do twice while some do none.
>
>     Thus, I think of using java multi thread to increase the concurrency. I
> wrote a program in scala which calls java multi thread without Spark on a
> single node, by watch the 'top' monitor, I find this program can use CPU up
> to 1500% ( means nearly every core are computing). But I have no idea how
> to
> use Java multi thread in RDD transformation.
>
>     Is there any one can provide some example code to use Java multi thread
> in RDD transformation, or give any idea to increase the concurrency ?
>
> Thanks for all
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-multi-thread-in-RDD-map-function-tp15286.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>