You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@mahout.apache.org by Reinis Vicups <ma...@orbit-x.de> on 2014/10/13 17:56:13 UTC

Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity

Hi,

I am currently testing SimilarityAnalysis.rowSimilarity and I am 
wondering, how could I increase number of tasks to use for distributed 
shuffle.

What I currently observe, is that SimilarityAnalysis is requiring almost 
20 minutes for my dataset only with this stage:

combineByKey at ABt.scala:126

When I view details for the stage I see that only one task is spawned 
running on one node.

I have my own implementation of SimilarityAnalysis and by tuning number 
of tasks I have reached HUGE performance gains.

Since I couldn't find how to pass the number of tasks to shuffle 
operations directly, I have set following in spark config

configuration = new SparkConf().setAppName(jobConfig.jobName)
         .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
         .set("spark.kryo.registrator", 
"org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
         .set("spark.kryo.referenceTracking", "false")
         .set("spark.kryoserializer.buffer.mb", "200")
         .set("spark.default.parallelism", 400) // <- this is the line 
supposed to set default parallelism to some high number

Thank you for your help
reinis


Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity

Posted by Reinis Vicups <ma...@orbit-x.de>.
 >Let me try this on my own cluster and see if I can reproduce. Are you 
using text files or have you gotten HBase working? How large is the input?

Using HBase as input (it's TFIDF-vector generated by seq2sparse) if I 
recall correctly about 30k documents with dimensionality of around 500k 
dimensions. I don't know how to get the table-size from HBase but the 
TFIDF-Vector hfile is bit over 20 MB


On 13.10.2014 19:11, Pat Ferrel wrote:
> But if it doesn’t work in the Spark config it probably won’t work here either. There may be something else causing only one task for nongraph ABt.
>
> Dmitriy? Any ideas
>
> Let me try this on my own cluster and see if I can reproduce. Are you using text files or have you gotten HBase working? How large is the input?
>
>
> On Oct 13, 2014, at 10:03 AM, Patrick Ferrel <pa...@occamsmachete.com> wrote:
>
> Best guess is, as I said, to put it in RowSimilarityDriver.start like this:
>     sparkConf.set("spark.kryo.referenceTracking", "false")
>       .set("spark.kryoserializer.buffer.mb", "200")
>       .set("spark.executor.memory",
> parser.opts("sparkExecutorMem").asInstanceOf[String])
>       .set("spark.default.parallelism", 400)
>
> The multiply happens during lazy evaluation and is executed by the blas
> optimizer, when the result is required. This means that many failures will
> seem to happen in the write since this is the first place where the values
> are accessed in a non-lazy manner.
>
> Actually we should probably not be hard coding the
> spark.kryoserializer.buffer.mb either.
>
> On Mon, Oct 13, 2014 at 9:54 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>
>> Hello,
>>
>> When you set the Spark config as below do you still get one task?
>> Unfortunately yes.
>>
>> Currently I am looking for the very first shuffle stage in
>> SimilarityAnalysis#rowSimilarity but cannot find it. There is a lot of
>> mapping, wrapping and caching during SimilarityAnalysis#sampleDownAndBinarizeand
>> I don't get where to look for the code of "%*%" in:
>>
>> // Compute row similarity cooccurrence matrix AA'
>> val drmAAt = drmA %*% drmA.t
>>
>> I would like to hard code partition number in that first shuffle just for
>> the sake of experiment.
>>
>>
>> On 13.10.2014 18:29, Pat Ferrel wrote:
>>
>>> I see no place where the spark.default.parallelism is set so your config
>>> can be set it to whatever you wish. When you set the Spark config as below
>>> do you still get one task? The test suite sets the
>>> spark.default.parallelism to 10 before the context is initialized. To do
>>> this with the SimilarityAnalysis.rowSimilarity (here I assume you are
>>> modifying the driver) put the  .set("spark.default.parallelism", 400) in
>>> RowSimilarityDriver.start and see if that changes things.
>>>
>>> If this doesn’t work it may be that the blas optimizer is doing something
>>> with the value but I’m lost in that code There is only one place the value
>>> is read, which is in Par.scala
>>>
>>>          // auto adjustment, try to scale up to either x1Size or x2Size.
>>>          val clusterSize = rdd.context.getConf.get("spark.default.parallelism",
>>> "1").toInt
>>>
>>>          val x1Size = (clusterSize * .95).ceil.toInt
>>>          val x2Size = (clusterSize * 1.9).ceil.toInt
>>>
>>>          if (rdd.partitions.size <= x1Size)
>>>            rdd.coalesce(numPartitions = x1Size, shuffle = true)
>>>          else if (rdd.partitions.size <= x2Size)
>>>            rdd.coalesce(numPartitions = x2Size, shuffle = true)
>>>          else
>>>            rdd.coalesce(numPartitions = rdd.partitions.size)
>>>
>>>
>>> Dmitriy can you shed any light on the use of spark.default.parallelism,
>>> how to increase it or how to get more than one task created when performing
>>> ABt?
>>>
>>>
>>> On Oct 13, 2014, at 8:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>>>
>>> Hi,
>>>
>>> I am currently testing SimilarityAnalysis.rowSimilarity and I am
>>> wondering, how could I increase number of tasks to use for distributed
>>> shuffle.
>>>
>>> What I currently observe, is that SimilarityAnalysis is requiring almost
>>> 20 minutes for my dataset only with this stage:
>>>
>>> combineByKey at ABt.scala:126
>>>
>>> When I view details for the stage I see that only one task is spawned
>>> running on one node.
>>>
>>> I have my own implementation of SimilarityAnalysis and by tuning number
>>> of tasks I have reached HUGE performance gains.
>>>
>>> Since I couldn't find how to pass the number of tasks to shuffle
>>> operations directly, I have set following in spark config
>>>
>>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>>>         .set("spark.serializer", "org.apache.spark.serializer.
>>> KryoSerializer")
>>>         .set("spark.kryo.registrator", "org.apache.mahout.
>>> sparkbindings.io.MahoutKryoRegistrator")
>>>         .set("spark.kryo.referenceTracking", "false")
>>>         .set("spark.kryoserializer.buffer.mb", "200")
>>>         .set("spark.default.parallelism", 400) // <- this is the line
>>> supposed to set default parallelism to some high number
>>>
>>> Thank you for your help
>>> reinis
>>>
>>>


Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity

Posted by Pat Ferrel <pa...@occamsmachete.com>.
But if it doesn’t work in the Spark config it probably won’t work here either. There may be something else causing only one task for nongraph ABt.

Dmitriy? Any ideas

Let me try this on my own cluster and see if I can reproduce. Are you using text files or have you gotten HBase working? How large is the input?


On Oct 13, 2014, at 10:03 AM, Patrick Ferrel <pa...@occamsmachete.com> wrote:

Best guess is, as I said, to put it in RowSimilarityDriver.start like this:
   sparkConf.set("spark.kryo.referenceTracking", "false")
     .set("spark.kryoserializer.buffer.mb", "200")
     .set("spark.executor.memory",
parser.opts("sparkExecutorMem").asInstanceOf[String])
     .set("spark.default.parallelism", 400)

The multiply happens during lazy evaluation and is executed by the blas
optimizer, when the result is required. This means that many failures will
seem to happen in the write since this is the first place where the values
are accessed in a non-lazy manner.

Actually we should probably not be hard coding the
spark.kryoserializer.buffer.mb either.

On Mon, Oct 13, 2014 at 9:54 AM, Reinis Vicups <ma...@orbit-x.de> wrote:

> Hello,
> 
> When you set the Spark config as below do you still get one task?
>> 
> 
> Unfortunately yes.
> 
> Currently I am looking for the very first shuffle stage in
> SimilarityAnalysis#rowSimilarity but cannot find it. There is a lot of
> mapping, wrapping and caching during SimilarityAnalysis#sampleDownAndBinarizeand
> I don't get where to look for the code of "%*%" in:
> 
> // Compute row similarity cooccurrence matrix AA'
> val drmAAt = drmA %*% drmA.t
> 
> I would like to hard code partition number in that first shuffle just for
> the sake of experiment.
> 
> 
> On 13.10.2014 18:29, Pat Ferrel wrote:
> 
>> I see no place where the spark.default.parallelism is set so your config
>> can be set it to whatever you wish. When you set the Spark config as below
>> do you still get one task? The test suite sets the
>> spark.default.parallelism to 10 before the context is initialized. To do
>> this with the SimilarityAnalysis.rowSimilarity (here I assume you are
>> modifying the driver) put the  .set("spark.default.parallelism", 400) in
>> RowSimilarityDriver.start and see if that changes things.
>> 
>> If this doesn’t work it may be that the blas optimizer is doing something
>> with the value but I’m lost in that code There is only one place the value
>> is read, which is in Par.scala
>> 
>>         // auto adjustment, try to scale up to either x1Size or x2Size.
>>         val clusterSize = rdd.context.getConf.get("spark.default.parallelism",
>> "1").toInt
>> 
>>         val x1Size = (clusterSize * .95).ceil.toInt
>>         val x2Size = (clusterSize * 1.9).ceil.toInt
>> 
>>         if (rdd.partitions.size <= x1Size)
>>           rdd.coalesce(numPartitions = x1Size, shuffle = true)
>>         else if (rdd.partitions.size <= x2Size)
>>           rdd.coalesce(numPartitions = x2Size, shuffle = true)
>>         else
>>           rdd.coalesce(numPartitions = rdd.partitions.size)
>> 
>> 
>> Dmitriy can you shed any light on the use of spark.default.parallelism,
>> how to increase it or how to get more than one task created when performing
>> ABt?
>> 
>> 
>> On Oct 13, 2014, at 8:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>> 
>> Hi,
>> 
>> I am currently testing SimilarityAnalysis.rowSimilarity and I am
>> wondering, how could I increase number of tasks to use for distributed
>> shuffle.
>> 
>> What I currently observe, is that SimilarityAnalysis is requiring almost
>> 20 minutes for my dataset only with this stage:
>> 
>> combineByKey at ABt.scala:126
>> 
>> When I view details for the stage I see that only one task is spawned
>> running on one node.
>> 
>> I have my own implementation of SimilarityAnalysis and by tuning number
>> of tasks I have reached HUGE performance gains.
>> 
>> Since I couldn't find how to pass the number of tasks to shuffle
>> operations directly, I have set following in spark config
>> 
>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>>        .set("spark.serializer", "org.apache.spark.serializer.
>> KryoSerializer")
>>        .set("spark.kryo.registrator", "org.apache.mahout.
>> sparkbindings.io.MahoutKryoRegistrator")
>>        .set("spark.kryo.referenceTracking", "false")
>>        .set("spark.kryoserializer.buffer.mb", "200")
>>        .set("spark.default.parallelism", 400) // <- this is the line
>> supposed to set default parallelism to some high number
>> 
>> Thank you for your help
>> reinis
>> 
>> 
> 


Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity

Posted by Patrick Ferrel <pa...@occamsmachete.com>.
Best guess is, as I said, to put it in RowSimilarityDriver.start like this:
    sparkConf.set("spark.kryo.referenceTracking", "false")
      .set("spark.kryoserializer.buffer.mb", "200")
      .set("spark.executor.memory",
parser.opts("sparkExecutorMem").asInstanceOf[String])
      .set("spark.default.parallelism", 400)

The multiply happens during lazy evaluation and is executed by the blas
optimizer, when the result is required. This means that many failures will
seem to happen in the write since this is the first place where the values
are accessed in a non-lazy manner.

Actually we should probably not be hard coding the
spark.kryoserializer.buffer.mb either.

On Mon, Oct 13, 2014 at 9:54 AM, Reinis Vicups <ma...@orbit-x.de> wrote:

> Hello,
>
>  When you set the Spark config as below do you still get one task?
>>
>
> Unfortunately yes.
>
> Currently I am looking for the very first shuffle stage in
> SimilarityAnalysis#rowSimilarity but cannot find it. There is a lot of
> mapping, wrapping and caching during SimilarityAnalysis#sampleDownAndBinarizeand
> I don't get where to look for the code of "%*%" in:
>
> // Compute row similarity cooccurrence matrix AA'
> val drmAAt = drmA %*% drmA.t
>
> I would like to hard code partition number in that first shuffle just for
> the sake of experiment.
>
>
> On 13.10.2014 18:29, Pat Ferrel wrote:
>
>> I see no place where the spark.default.parallelism is set so your config
>> can be set it to whatever you wish. When you set the Spark config as below
>> do you still get one task? The test suite sets the
>> spark.default.parallelism to 10 before the context is initialized. To do
>> this with the SimilarityAnalysis.rowSimilarity (here I assume you are
>> modifying the driver) put the  .set("spark.default.parallelism", 400) in
>> RowSimilarityDriver.start and see if that changes things.
>>
>> If this doesn’t work it may be that the blas optimizer is doing something
>> with the value but I’m lost in that code There is only one place the value
>> is read, which is in Par.scala
>>
>>          // auto adjustment, try to scale up to either x1Size or x2Size.
>>          val clusterSize = rdd.context.getConf.get("spark.default.parallelism",
>> "1").toInt
>>
>>          val x1Size = (clusterSize * .95).ceil.toInt
>>          val x2Size = (clusterSize * 1.9).ceil.toInt
>>
>>          if (rdd.partitions.size <= x1Size)
>>            rdd.coalesce(numPartitions = x1Size, shuffle = true)
>>          else if (rdd.partitions.size <= x2Size)
>>            rdd.coalesce(numPartitions = x2Size, shuffle = true)
>>          else
>>            rdd.coalesce(numPartitions = rdd.partitions.size)
>>
>>
>> Dmitriy can you shed any light on the use of spark.default.parallelism,
>> how to increase it or how to get more than one task created when performing
>> ABt?
>>
>>
>> On Oct 13, 2014, at 8:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>>
>> Hi,
>>
>> I am currently testing SimilarityAnalysis.rowSimilarity and I am
>> wondering, how could I increase number of tasks to use for distributed
>> shuffle.
>>
>> What I currently observe, is that SimilarityAnalysis is requiring almost
>> 20 minutes for my dataset only with this stage:
>>
>> combineByKey at ABt.scala:126
>>
>> When I view details for the stage I see that only one task is spawned
>> running on one node.
>>
>> I have my own implementation of SimilarityAnalysis and by tuning number
>> of tasks I have reached HUGE performance gains.
>>
>> Since I couldn't find how to pass the number of tasks to shuffle
>> operations directly, I have set following in spark config
>>
>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>>         .set("spark.serializer", "org.apache.spark.serializer.
>> KryoSerializer")
>>         .set("spark.kryo.registrator", "org.apache.mahout.
>> sparkbindings.io.MahoutKryoRegistrator")
>>         .set("spark.kryo.referenceTracking", "false")
>>         .set("spark.kryoserializer.buffer.mb", "200")
>>         .set("spark.default.parallelism", 400) // <- this is the line
>> supposed to set default parallelism to some high number
>>
>> Thank you for your help
>> reinis
>>
>>
>

Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity

Posted by Reinis Vicups <ma...@orbit-x.de>.
Hello,

>When you set the Spark config as below do you still get one task?

Unfortunately yes.

Currently I am looking for the very first shuffle stage in 
SimilarityAnalysis#rowSimilarity but cannot find it. There is a lot of 
mapping, wrapping and caching during 
SimilarityAnalysis#sampleDownAndBinarizeand I don't get where to look 
for the code of "%*%" in:

// Compute row similarity cooccurrence matrix AA'
val drmAAt = drmA %*% drmA.t

I would like to hard code partition number in that first shuffle just 
for the sake of experiment.

On 13.10.2014 18:29, Pat Ferrel wrote:
> I see no place where the spark.default.parallelism is set so your config can be set it to whatever you wish. When you set the Spark config as below do you still get one task? The test suite sets the spark.default.parallelism to 10 before the context is initialized. To do this with the SimilarityAnalysis.rowSimilarity (here I assume you are modifying the driver) put the  .set("spark.default.parallelism", 400) in RowSimilarityDriver.start and see if that changes things.
>
> If this doesn’t work it may be that the blas optimizer is doing something with the value but I’m lost in that code There is only one place the value is read, which is in Par.scala
>
>          // auto adjustment, try to scale up to either x1Size or x2Size.
>          val clusterSize = rdd.context.getConf.get("spark.default.parallelism", "1").toInt
>
>          val x1Size = (clusterSize * .95).ceil.toInt
>          val x2Size = (clusterSize * 1.9).ceil.toInt
>
>          if (rdd.partitions.size <= x1Size)
>            rdd.coalesce(numPartitions = x1Size, shuffle = true)
>          else if (rdd.partitions.size <= x2Size)
>            rdd.coalesce(numPartitions = x2Size, shuffle = true)
>          else
>            rdd.coalesce(numPartitions = rdd.partitions.size)
>
>
> Dmitriy can you shed any light on the use of spark.default.parallelism, how to increase it or how to get more than one task created when performing ABt?
>
>
> On Oct 13, 2014, at 8:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>
> Hi,
>
> I am currently testing SimilarityAnalysis.rowSimilarity and I am wondering, how could I increase number of tasks to use for distributed shuffle.
>
> What I currently observe, is that SimilarityAnalysis is requiring almost 20 minutes for my dataset only with this stage:
>
> combineByKey at ABt.scala:126
>
> When I view details for the stage I see that only one task is spawned running on one node.
>
> I have my own implementation of SimilarityAnalysis and by tuning number of tasks I have reached HUGE performance gains.
>
> Since I couldn't find how to pass the number of tasks to shuffle operations directly, I have set following in spark config
>
> configuration = new SparkConf().setAppName(jobConfig.jobName)
>         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>         .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
>         .set("spark.kryo.referenceTracking", "false")
>         .set("spark.kryoserializer.buffer.mb", "200")
>         .set("spark.default.parallelism", 400) // <- this is the line supposed to set default parallelism to some high number
>
> Thank you for your help
> reinis
>


Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity

Posted by Pat Ferrel <pa...@occamsmachete.com>.
I see no place where the spark.default.parallelism is set so your config can be set it to whatever you wish. When you set the Spark config as below do you still get one task? The test suite sets the spark.default.parallelism to 10 before the context is initialized. To do this with the SimilarityAnalysis.rowSimilarity (here I assume you are modifying the driver) put the  .set("spark.default.parallelism", 400) in RowSimilarityDriver.start and see if that changes things.

If this doesn’t work it may be that the blas optimizer is doing something with the value but I’m lost in that code There is only one place the value is read, which is in Par.scala

        // auto adjustment, try to scale up to either x1Size or x2Size.
        val clusterSize = rdd.context.getConf.get("spark.default.parallelism", "1").toInt

        val x1Size = (clusterSize * .95).ceil.toInt
        val x2Size = (clusterSize * 1.9).ceil.toInt

        if (rdd.partitions.size <= x1Size)
          rdd.coalesce(numPartitions = x1Size, shuffle = true)
        else if (rdd.partitions.size <= x2Size)
          rdd.coalesce(numPartitions = x2Size, shuffle = true)
        else
          rdd.coalesce(numPartitions = rdd.partitions.size)


Dmitriy can you shed any light on the use of spark.default.parallelism, how to increase it or how to get more than one task created when performing ABt?


On Oct 13, 2014, at 8:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:

Hi,

I am currently testing SimilarityAnalysis.rowSimilarity and I am wondering, how could I increase number of tasks to use for distributed shuffle.

What I currently observe, is that SimilarityAnalysis is requiring almost 20 minutes for my dataset only with this stage:

combineByKey at ABt.scala:126

When I view details for the stage I see that only one task is spawned running on one node.

I have my own implementation of SimilarityAnalysis and by tuning number of tasks I have reached HUGE performance gains.

Since I couldn't find how to pass the number of tasks to shuffle operations directly, I have set following in spark config

configuration = new SparkConf().setAppName(jobConfig.jobName)
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
       .set("spark.kryo.referenceTracking", "false")
       .set("spark.kryoserializer.buffer.mb", "200")
       .set("spark.default.parallelism", 400) // <- this is the line supposed to set default parallelism to some high number

Thank you for your help
reinis



Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity

Posted by Reinis Vicups <ma...@orbit-x.de>.
Of course the number of partitions/tasks shall be configurable, I am 
just saying that in my experiments I have observed a close-to-linear 
performance increase just by increasing number of partitions/tasks 
(which was absolutely not the case with map-reduce).

I am assuming that spark is not "smart" enough to set optimal values for 
the parallelism. I recall reading someplace that the default is number 
of CPUs or 2 - whatever is larger. Because of the task nature (if I am 
not mistaken, those are wrapped akka actors) it is possible to 
efficiently execute a way higher number of tasks per CPU. They suggest 
this 
http://spark.apache.org/docs/latest/tuning.html#level-of-parallelism but 
I have observed sometimes considerable performance gains when increasing 
number of tasks to 8 or even to 16 per CPU core.

On 13.10.2014 18:53, Pat Ferrel wrote:
> There is a possibility that we are doing something with partitioning that interferes but I think Ted’s point is that Spark should do the right thing in most cases—unless we interfere. Those values are meant for tuning to the exact job you are doing, but it may not be appropriate for us to hard code them. We could allow the CLI to set them like we do with -sem if needed.
>
> Let’s see what Dmitriy thinks about why only one task is being created.
>
> On Oct 13, 2014, at 9:32 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>
> Hi,
>
>> Do you think that simply increasing this parameter is a safe and sane thing
>> to do?
> Why would it be unsafe?
>
> In my own implementation I am using 400 tasks on my 4-node-2cpu cluster and the execution times of largest shuffle stage have dropped around 10 times.
> I have number of test values back from the time when I used "old" RowSimilarityJob and with some exceptions (I guess due to randomized sparsization) I still have approx. the same values with my own row similarity implementation.
>
> reinis
>
> On 13.10.2014 18:06, Ted Dunning wrote:
>> On Mon, Oct 13, 2014 at 11:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>>
>>> I have my own implementation of SimilarityAnalysis and by tuning number of
>>> tasks I have reached HUGE performance gains.
>>>
>>> Since I couldn't find how to pass the number of tasks to shuffle
>>> operations directly, I have set following in spark config
>>>
>>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>>>          .set("spark.serializer", "org.apache.spark.serializer.
>>> KryoSerializer")
>>>          .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io
>>> .MahoutKryoRegistrator")
>>>          .set("spark.kryo.referenceTracking", "false")
>>>          .set("spark.kryoserializer.buffer.mb", "200")
>>>          .set("spark.default.parallelism", 400) // <- this is the line
>>> supposed to set default parallelism to some high number
>>>
>>> Thank you for your help
>>>
>> Thank you for YOUR help!
>>
>> Do you think that simply increasing this parameter is a safe and sane thing
>> to do?
>>


Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity

Posted by Pat Ferrel <pa...@occamsmachete.com>.
There is a possibility that we are doing something with partitioning that interferes but I think Ted’s point is that Spark should do the right thing in most cases—unless we interfere. Those values are meant for tuning to the exact job you are doing, but it may not be appropriate for us to hard code them. We could allow the CLI to set them like we do with -sem if needed.

Let’s see what Dmitriy thinks about why only one task is being created.

On Oct 13, 2014, at 9:32 AM, Reinis Vicups <ma...@orbit-x.de> wrote:

Hi,

> Do you think that simply increasing this parameter is a safe and sane thing
> to do?

Why would it be unsafe?

In my own implementation I am using 400 tasks on my 4-node-2cpu cluster and the execution times of largest shuffle stage have dropped around 10 times.
I have number of test values back from the time when I used "old" RowSimilarityJob and with some exceptions (I guess due to randomized sparsization) I still have approx. the same values with my own row similarity implementation.

reinis

On 13.10.2014 18:06, Ted Dunning wrote:
> On Mon, Oct 13, 2014 at 11:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
> 
>> I have my own implementation of SimilarityAnalysis and by tuning number of
>> tasks I have reached HUGE performance gains.
>> 
>> Since I couldn't find how to pass the number of tasks to shuffle
>> operations directly, I have set following in spark config
>> 
>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>>         .set("spark.serializer", "org.apache.spark.serializer.
>> KryoSerializer")
>>         .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io
>> .MahoutKryoRegistrator")
>>         .set("spark.kryo.referenceTracking", "false")
>>         .set("spark.kryoserializer.buffer.mb", "200")
>>         .set("spark.default.parallelism", 400) // <- this is the line
>> supposed to set default parallelism to some high number
>> 
>> Thank you for your help
>> 
> Thank you for YOUR help!
> 
> Do you think that simply increasing this parameter is a safe and sane thing
> to do?
> 



Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity

Posted by Pat Ferrel <pa...@gmail.com>.
I’ll add a new option to escape any spark options and put them directly into the SparkConf for the job before the context is created.

The CLI will be something like -D xxx=yyy so for this case you can change the default parallelism with 

-D spark.default.parallelism=400

If the logic holds that you can often have 16 to 8 x your number of cores then running locally on my laptop with local[7] should have -D spark.default.parallelism=112 or 56

If you want this value set for your entire cluster you should be able to set it in the conf files when you launch the cluster. We don’t change any of those values in the client except spark.executor.memory (only if specified) and any escaped values. 

On Oct 13, 2014, at 11:32 AM, Ted Dunning <te...@gmail.com> wrote:

On Mon, Oct 13, 2014 at 12:32 PM, Reinis Vicups <ma...@orbit-x.de> wrote:

> 
> Do you think that simply increasing this parameter is a safe and sane
>> thing
>> to do?
>> 
> 
> Why would it be unsafe?
> 
> In my own implementation I am using 400 tasks on my 4-node-2cpu cluster
> and the execution times of largest shuffle stage have dropped around 10
> times.
> I have number of test values back from the time when I used "old"
> RowSimilarityJob and with some exceptions (I guess due to randomized
> sparsization) I still have approx. the same values with my own row
> similarity implementation.
> 

Splitting things too far can make processes much less efficient.  Setting
parameters like this may propagate further than desired.

I asked because I don't know, however.


Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity

Posted by Ted Dunning <te...@gmail.com>.
On Mon, Oct 13, 2014 at 12:32 PM, Reinis Vicups <ma...@orbit-x.de> wrote:

>
>  Do you think that simply increasing this parameter is a safe and sane
>> thing
>> to do?
>>
>
> Why would it be unsafe?
>
> In my own implementation I am using 400 tasks on my 4-node-2cpu cluster
> and the execution times of largest shuffle stage have dropped around 10
> times.
> I have number of test values back from the time when I used "old"
> RowSimilarityJob and with some exceptions (I guess due to randomized
> sparsization) I still have approx. the same values with my own row
> similarity implementation.
>

Splitting things too far can make processes much less efficient.  Setting
parameters like this may propagate further than desired.

I asked because I don't know, however.

Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity

Posted by Reinis Vicups <ma...@orbit-x.de>.
Hi,

>Do you think that simply increasing this parameter is a safe and sane thing
>to do?

Why would it be unsafe?

In my own implementation I am using 400 tasks on my 4-node-2cpu cluster 
and the execution times of largest shuffle stage have dropped around 10 
times.
I have number of test values back from the time when I used "old" 
RowSimilarityJob and with some exceptions (I guess due to randomized 
sparsization) I still have approx. the same values with my own row 
similarity implementation.

reinis

On 13.10.2014 18:06, Ted Dunning wrote:
> On Mon, Oct 13, 2014 at 11:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>
>> I have my own implementation of SimilarityAnalysis and by tuning number of
>> tasks I have reached HUGE performance gains.
>>
>> Since I couldn't find how to pass the number of tasks to shuffle
>> operations directly, I have set following in spark config
>>
>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>>          .set("spark.serializer", "org.apache.spark.serializer.
>> KryoSerializer")
>>          .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io
>> .MahoutKryoRegistrator")
>>          .set("spark.kryo.referenceTracking", "false")
>>          .set("spark.kryoserializer.buffer.mb", "200")
>>          .set("spark.default.parallelism", 400) // <- this is the line
>> supposed to set default parallelism to some high number
>>
>> Thank you for your help
>>
> Thank you for YOUR help!
>
> Do you think that simply increasing this parameter is a safe and sane thing
> to do?
>


Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity

Posted by Ted Dunning <te...@gmail.com>.
On Mon, Oct 13, 2014 at 11:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:

> I have my own implementation of SimilarityAnalysis and by tuning number of
> tasks I have reached HUGE performance gains.
>
> Since I couldn't find how to pass the number of tasks to shuffle
> operations directly, I have set following in spark config
>
> configuration = new SparkConf().setAppName(jobConfig.jobName)
>         .set("spark.serializer", "org.apache.spark.serializer.
> KryoSerializer")
>         .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io
> .MahoutKryoRegistrator")
>         .set("spark.kryo.referenceTracking", "false")
>         .set("spark.kryoserializer.buffer.mb", "200")
>         .set("spark.default.parallelism", 400) // <- this is the line
> supposed to set default parallelism to some high number
>
> Thank you for your help
>

Thank you for YOUR help!

Do you think that simply increasing this parameter is a safe and sane thing
to do?