You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@mahout.apache.org by Reinis Vicups <ma...@orbit-x.de> on 2014/10/13 17:56:13 UTC
Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Hi,
I am currently testing SimilarityAnalysis.rowSimilarity and I am
wondering, how could I increase number of tasks to use for distributed
shuffle.
What I currently observe, is that SimilarityAnalysis is requiring almost
20 minutes for my dataset only with this stage:
combineByKey at ABt.scala:126
When I view details for the stage I see that only one task is spawned
running on one node.
I have my own implementation of SimilarityAnalysis and by tuning number
of tasks I have reached HUGE performance gains.
Since I couldn't find how to pass the number of tasks to shuffle
operations directly, I have set following in spark config
configuration = new SparkConf().setAppName(jobConfig.jobName)
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator",
"org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
.set("spark.kryo.referenceTracking", "false")
.set("spark.kryoserializer.buffer.mb", "200")
.set("spark.default.parallelism", 400) // <- this is the line
supposed to set default parallelism to some high number
Thank you for your help
reinis
Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Posted by Reinis Vicups <ma...@orbit-x.de>.
>Let me try this on my own cluster and see if I can reproduce. Are you
using text files or have you gotten HBase working? How large is the input?
Using HBase as input (it's TFIDF-vector generated by seq2sparse) if I
recall correctly about 30k documents with dimensionality of around 500k
dimensions. I don't know how to get the table-size from HBase but the
TFIDF-Vector hfile is bit over 20 MB
On 13.10.2014 19:11, Pat Ferrel wrote:
> But if it doesn’t work in the Spark config it probably won’t work here either. There may be something else causing only one task for nongraph ABt.
>
> Dmitriy? Any ideas
>
> Let me try this on my own cluster and see if I can reproduce. Are you using text files or have you gotten HBase working? How large is the input?
>
>
> On Oct 13, 2014, at 10:03 AM, Patrick Ferrel <pa...@occamsmachete.com> wrote:
>
> Best guess is, as I said, to put it in RowSimilarityDriver.start like this:
> sparkConf.set("spark.kryo.referenceTracking", "false")
> .set("spark.kryoserializer.buffer.mb", "200")
> .set("spark.executor.memory",
> parser.opts("sparkExecutorMem").asInstanceOf[String])
> .set("spark.default.parallelism", 400)
>
> The multiply happens during lazy evaluation and is executed by the blas
> optimizer, when the result is required. This means that many failures will
> seem to happen in the write since this is the first place where the values
> are accessed in a non-lazy manner.
>
> Actually we should probably not be hard coding the
> spark.kryoserializer.buffer.mb either.
>
> On Mon, Oct 13, 2014 at 9:54 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>
>> Hello,
>>
>> When you set the Spark config as below do you still get one task?
>> Unfortunately yes.
>>
>> Currently I am looking for the very first shuffle stage in
>> SimilarityAnalysis#rowSimilarity but cannot find it. There is a lot of
>> mapping, wrapping and caching during SimilarityAnalysis#sampleDownAndBinarizeand
>> I don't get where to look for the code of "%*%" in:
>>
>> // Compute row similarity cooccurrence matrix AA'
>> val drmAAt = drmA %*% drmA.t
>>
>> I would like to hard code partition number in that first shuffle just for
>> the sake of experiment.
>>
>>
>> On 13.10.2014 18:29, Pat Ferrel wrote:
>>
>>> I see no place where the spark.default.parallelism is set so your config
>>> can be set it to whatever you wish. When you set the Spark config as below
>>> do you still get one task? The test suite sets the
>>> spark.default.parallelism to 10 before the context is initialized. To do
>>> this with the SimilarityAnalysis.rowSimilarity (here I assume you are
>>> modifying the driver) put the .set("spark.default.parallelism", 400) in
>>> RowSimilarityDriver.start and see if that changes things.
>>>
>>> If this doesn’t work it may be that the blas optimizer is doing something
>>> with the value but I’m lost in that code There is only one place the value
>>> is read, which is in Par.scala
>>>
>>> // auto adjustment, try to scale up to either x1Size or x2Size.
>>> val clusterSize = rdd.context.getConf.get("spark.default.parallelism",
>>> "1").toInt
>>>
>>> val x1Size = (clusterSize * .95).ceil.toInt
>>> val x2Size = (clusterSize * 1.9).ceil.toInt
>>>
>>> if (rdd.partitions.size <= x1Size)
>>> rdd.coalesce(numPartitions = x1Size, shuffle = true)
>>> else if (rdd.partitions.size <= x2Size)
>>> rdd.coalesce(numPartitions = x2Size, shuffle = true)
>>> else
>>> rdd.coalesce(numPartitions = rdd.partitions.size)
>>>
>>>
>>> Dmitriy can you shed any light on the use of spark.default.parallelism,
>>> how to increase it or how to get more than one task created when performing
>>> ABt?
>>>
>>>
>>> On Oct 13, 2014, at 8:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>>>
>>> Hi,
>>>
>>> I am currently testing SimilarityAnalysis.rowSimilarity and I am
>>> wondering, how could I increase number of tasks to use for distributed
>>> shuffle.
>>>
>>> What I currently observe, is that SimilarityAnalysis is requiring almost
>>> 20 minutes for my dataset only with this stage:
>>>
>>> combineByKey at ABt.scala:126
>>>
>>> When I view details for the stage I see that only one task is spawned
>>> running on one node.
>>>
>>> I have my own implementation of SimilarityAnalysis and by tuning number
>>> of tasks I have reached HUGE performance gains.
>>>
>>> Since I couldn't find how to pass the number of tasks to shuffle
>>> operations directly, I have set following in spark config
>>>
>>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>>> .set("spark.serializer", "org.apache.spark.serializer.
>>> KryoSerializer")
>>> .set("spark.kryo.registrator", "org.apache.mahout.
>>> sparkbindings.io.MahoutKryoRegistrator")
>>> .set("spark.kryo.referenceTracking", "false")
>>> .set("spark.kryoserializer.buffer.mb", "200")
>>> .set("spark.default.parallelism", 400) // <- this is the line
>>> supposed to set default parallelism to some high number
>>>
>>> Thank you for your help
>>> reinis
>>>
>>>
Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Posted by Pat Ferrel <pa...@occamsmachete.com>.
But if it doesn’t work in the Spark config it probably won’t work here either. There may be something else causing only one task for nongraph ABt.
Dmitriy? Any ideas
Let me try this on my own cluster and see if I can reproduce. Are you using text files or have you gotten HBase working? How large is the input?
On Oct 13, 2014, at 10:03 AM, Patrick Ferrel <pa...@occamsmachete.com> wrote:
Best guess is, as I said, to put it in RowSimilarityDriver.start like this:
sparkConf.set("spark.kryo.referenceTracking", "false")
.set("spark.kryoserializer.buffer.mb", "200")
.set("spark.executor.memory",
parser.opts("sparkExecutorMem").asInstanceOf[String])
.set("spark.default.parallelism", 400)
The multiply happens during lazy evaluation and is executed by the blas
optimizer, when the result is required. This means that many failures will
seem to happen in the write since this is the first place where the values
are accessed in a non-lazy manner.
Actually we should probably not be hard coding the
spark.kryoserializer.buffer.mb either.
On Mon, Oct 13, 2014 at 9:54 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
> Hello,
>
> When you set the Spark config as below do you still get one task?
>>
>
> Unfortunately yes.
>
> Currently I am looking for the very first shuffle stage in
> SimilarityAnalysis#rowSimilarity but cannot find it. There is a lot of
> mapping, wrapping and caching during SimilarityAnalysis#sampleDownAndBinarizeand
> I don't get where to look for the code of "%*%" in:
>
> // Compute row similarity cooccurrence matrix AA'
> val drmAAt = drmA %*% drmA.t
>
> I would like to hard code partition number in that first shuffle just for
> the sake of experiment.
>
>
> On 13.10.2014 18:29, Pat Ferrel wrote:
>
>> I see no place where the spark.default.parallelism is set so your config
>> can be set it to whatever you wish. When you set the Spark config as below
>> do you still get one task? The test suite sets the
>> spark.default.parallelism to 10 before the context is initialized. To do
>> this with the SimilarityAnalysis.rowSimilarity (here I assume you are
>> modifying the driver) put the .set("spark.default.parallelism", 400) in
>> RowSimilarityDriver.start and see if that changes things.
>>
>> If this doesn’t work it may be that the blas optimizer is doing something
>> with the value but I’m lost in that code There is only one place the value
>> is read, which is in Par.scala
>>
>> // auto adjustment, try to scale up to either x1Size or x2Size.
>> val clusterSize = rdd.context.getConf.get("spark.default.parallelism",
>> "1").toInt
>>
>> val x1Size = (clusterSize * .95).ceil.toInt
>> val x2Size = (clusterSize * 1.9).ceil.toInt
>>
>> if (rdd.partitions.size <= x1Size)
>> rdd.coalesce(numPartitions = x1Size, shuffle = true)
>> else if (rdd.partitions.size <= x2Size)
>> rdd.coalesce(numPartitions = x2Size, shuffle = true)
>> else
>> rdd.coalesce(numPartitions = rdd.partitions.size)
>>
>>
>> Dmitriy can you shed any light on the use of spark.default.parallelism,
>> how to increase it or how to get more than one task created when performing
>> ABt?
>>
>>
>> On Oct 13, 2014, at 8:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>>
>> Hi,
>>
>> I am currently testing SimilarityAnalysis.rowSimilarity and I am
>> wondering, how could I increase number of tasks to use for distributed
>> shuffle.
>>
>> What I currently observe, is that SimilarityAnalysis is requiring almost
>> 20 minutes for my dataset only with this stage:
>>
>> combineByKey at ABt.scala:126
>>
>> When I view details for the stage I see that only one task is spawned
>> running on one node.
>>
>> I have my own implementation of SimilarityAnalysis and by tuning number
>> of tasks I have reached HUGE performance gains.
>>
>> Since I couldn't find how to pass the number of tasks to shuffle
>> operations directly, I have set following in spark config
>>
>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>> .set("spark.serializer", "org.apache.spark.serializer.
>> KryoSerializer")
>> .set("spark.kryo.registrator", "org.apache.mahout.
>> sparkbindings.io.MahoutKryoRegistrator")
>> .set("spark.kryo.referenceTracking", "false")
>> .set("spark.kryoserializer.buffer.mb", "200")
>> .set("spark.default.parallelism", 400) // <- this is the line
>> supposed to set default parallelism to some high number
>>
>> Thank you for your help
>> reinis
>>
>>
>
Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Posted by Patrick Ferrel <pa...@occamsmachete.com>.
Best guess is, as I said, to put it in RowSimilarityDriver.start like this:
sparkConf.set("spark.kryo.referenceTracking", "false")
.set("spark.kryoserializer.buffer.mb", "200")
.set("spark.executor.memory",
parser.opts("sparkExecutorMem").asInstanceOf[String])
.set("spark.default.parallelism", 400)
The multiply happens during lazy evaluation and is executed by the blas
optimizer, when the result is required. This means that many failures will
seem to happen in the write since this is the first place where the values
are accessed in a non-lazy manner.
Actually we should probably not be hard coding the
spark.kryoserializer.buffer.mb either.
On Mon, Oct 13, 2014 at 9:54 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
> Hello,
>
> When you set the Spark config as below do you still get one task?
>>
>
> Unfortunately yes.
>
> Currently I am looking for the very first shuffle stage in
> SimilarityAnalysis#rowSimilarity but cannot find it. There is a lot of
> mapping, wrapping and caching during SimilarityAnalysis#sampleDownAndBinarizeand
> I don't get where to look for the code of "%*%" in:
>
> // Compute row similarity cooccurrence matrix AA'
> val drmAAt = drmA %*% drmA.t
>
> I would like to hard code partition number in that first shuffle just for
> the sake of experiment.
>
>
> On 13.10.2014 18:29, Pat Ferrel wrote:
>
>> I see no place where the spark.default.parallelism is set so your config
>> can be set it to whatever you wish. When you set the Spark config as below
>> do you still get one task? The test suite sets the
>> spark.default.parallelism to 10 before the context is initialized. To do
>> this with the SimilarityAnalysis.rowSimilarity (here I assume you are
>> modifying the driver) put the .set("spark.default.parallelism", 400) in
>> RowSimilarityDriver.start and see if that changes things.
>>
>> If this doesn’t work it may be that the blas optimizer is doing something
>> with the value but I’m lost in that code There is only one place the value
>> is read, which is in Par.scala
>>
>> // auto adjustment, try to scale up to either x1Size or x2Size.
>> val clusterSize = rdd.context.getConf.get("spark.default.parallelism",
>> "1").toInt
>>
>> val x1Size = (clusterSize * .95).ceil.toInt
>> val x2Size = (clusterSize * 1.9).ceil.toInt
>>
>> if (rdd.partitions.size <= x1Size)
>> rdd.coalesce(numPartitions = x1Size, shuffle = true)
>> else if (rdd.partitions.size <= x2Size)
>> rdd.coalesce(numPartitions = x2Size, shuffle = true)
>> else
>> rdd.coalesce(numPartitions = rdd.partitions.size)
>>
>>
>> Dmitriy can you shed any light on the use of spark.default.parallelism,
>> how to increase it or how to get more than one task created when performing
>> ABt?
>>
>>
>> On Oct 13, 2014, at 8:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>>
>> Hi,
>>
>> I am currently testing SimilarityAnalysis.rowSimilarity and I am
>> wondering, how could I increase number of tasks to use for distributed
>> shuffle.
>>
>> What I currently observe, is that SimilarityAnalysis is requiring almost
>> 20 minutes for my dataset only with this stage:
>>
>> combineByKey at ABt.scala:126
>>
>> When I view details for the stage I see that only one task is spawned
>> running on one node.
>>
>> I have my own implementation of SimilarityAnalysis and by tuning number
>> of tasks I have reached HUGE performance gains.
>>
>> Since I couldn't find how to pass the number of tasks to shuffle
>> operations directly, I have set following in spark config
>>
>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>> .set("spark.serializer", "org.apache.spark.serializer.
>> KryoSerializer")
>> .set("spark.kryo.registrator", "org.apache.mahout.
>> sparkbindings.io.MahoutKryoRegistrator")
>> .set("spark.kryo.referenceTracking", "false")
>> .set("spark.kryoserializer.buffer.mb", "200")
>> .set("spark.default.parallelism", 400) // <- this is the line
>> supposed to set default parallelism to some high number
>>
>> Thank you for your help
>> reinis
>>
>>
>
Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Posted by Reinis Vicups <ma...@orbit-x.de>.
Hello,
>When you set the Spark config as below do you still get one task?
Unfortunately yes.
Currently I am looking for the very first shuffle stage in
SimilarityAnalysis#rowSimilarity but cannot find it. There is a lot of
mapping, wrapping and caching during
SimilarityAnalysis#sampleDownAndBinarizeand I don't get where to look
for the code of "%*%" in:
// Compute row similarity cooccurrence matrix AA'
val drmAAt = drmA %*% drmA.t
I would like to hard code partition number in that first shuffle just
for the sake of experiment.
On 13.10.2014 18:29, Pat Ferrel wrote:
> I see no place where the spark.default.parallelism is set so your config can be set it to whatever you wish. When you set the Spark config as below do you still get one task? The test suite sets the spark.default.parallelism to 10 before the context is initialized. To do this with the SimilarityAnalysis.rowSimilarity (here I assume you are modifying the driver) put the .set("spark.default.parallelism", 400) in RowSimilarityDriver.start and see if that changes things.
>
> If this doesn’t work it may be that the blas optimizer is doing something with the value but I’m lost in that code There is only one place the value is read, which is in Par.scala
>
> // auto adjustment, try to scale up to either x1Size or x2Size.
> val clusterSize = rdd.context.getConf.get("spark.default.parallelism", "1").toInt
>
> val x1Size = (clusterSize * .95).ceil.toInt
> val x2Size = (clusterSize * 1.9).ceil.toInt
>
> if (rdd.partitions.size <= x1Size)
> rdd.coalesce(numPartitions = x1Size, shuffle = true)
> else if (rdd.partitions.size <= x2Size)
> rdd.coalesce(numPartitions = x2Size, shuffle = true)
> else
> rdd.coalesce(numPartitions = rdd.partitions.size)
>
>
> Dmitriy can you shed any light on the use of spark.default.parallelism, how to increase it or how to get more than one task created when performing ABt?
>
>
> On Oct 13, 2014, at 8:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>
> Hi,
>
> I am currently testing SimilarityAnalysis.rowSimilarity and I am wondering, how could I increase number of tasks to use for distributed shuffle.
>
> What I currently observe, is that SimilarityAnalysis is requiring almost 20 minutes for my dataset only with this stage:
>
> combineByKey at ABt.scala:126
>
> When I view details for the stage I see that only one task is spawned running on one node.
>
> I have my own implementation of SimilarityAnalysis and by tuning number of tasks I have reached HUGE performance gains.
>
> Since I couldn't find how to pass the number of tasks to shuffle operations directly, I have set following in spark config
>
> configuration = new SparkConf().setAppName(jobConfig.jobName)
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
> .set("spark.kryo.referenceTracking", "false")
> .set("spark.kryoserializer.buffer.mb", "200")
> .set("spark.default.parallelism", 400) // <- this is the line supposed to set default parallelism to some high number
>
> Thank you for your help
> reinis
>
Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Posted by Pat Ferrel <pa...@occamsmachete.com>.
I see no place where the spark.default.parallelism is set so your config can be set it to whatever you wish. When you set the Spark config as below do you still get one task? The test suite sets the spark.default.parallelism to 10 before the context is initialized. To do this with the SimilarityAnalysis.rowSimilarity (here I assume you are modifying the driver) put the .set("spark.default.parallelism", 400) in RowSimilarityDriver.start and see if that changes things.
If this doesn’t work it may be that the blas optimizer is doing something with the value but I’m lost in that code There is only one place the value is read, which is in Par.scala
// auto adjustment, try to scale up to either x1Size or x2Size.
val clusterSize = rdd.context.getConf.get("spark.default.parallelism", "1").toInt
val x1Size = (clusterSize * .95).ceil.toInt
val x2Size = (clusterSize * 1.9).ceil.toInt
if (rdd.partitions.size <= x1Size)
rdd.coalesce(numPartitions = x1Size, shuffle = true)
else if (rdd.partitions.size <= x2Size)
rdd.coalesce(numPartitions = x2Size, shuffle = true)
else
rdd.coalesce(numPartitions = rdd.partitions.size)
Dmitriy can you shed any light on the use of spark.default.parallelism, how to increase it or how to get more than one task created when performing ABt?
On Oct 13, 2014, at 8:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
Hi,
I am currently testing SimilarityAnalysis.rowSimilarity and I am wondering, how could I increase number of tasks to use for distributed shuffle.
What I currently observe, is that SimilarityAnalysis is requiring almost 20 minutes for my dataset only with this stage:
combineByKey at ABt.scala:126
When I view details for the stage I see that only one task is spawned running on one node.
I have my own implementation of SimilarityAnalysis and by tuning number of tasks I have reached HUGE performance gains.
Since I couldn't find how to pass the number of tasks to shuffle operations directly, I have set following in spark config
configuration = new SparkConf().setAppName(jobConfig.jobName)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
.set("spark.kryo.referenceTracking", "false")
.set("spark.kryoserializer.buffer.mb", "200")
.set("spark.default.parallelism", 400) // <- this is the line supposed to set default parallelism to some high number
Thank you for your help
reinis
Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Posted by Reinis Vicups <ma...@orbit-x.de>.
Of course the number of partitions/tasks shall be configurable, I am
just saying that in my experiments I have observed a close-to-linear
performance increase just by increasing number of partitions/tasks
(which was absolutely not the case with map-reduce).
I am assuming that spark is not "smart" enough to set optimal values for
the parallelism. I recall reading someplace that the default is number
of CPUs or 2 - whatever is larger. Because of the task nature (if I am
not mistaken, those are wrapped akka actors) it is possible to
efficiently execute a way higher number of tasks per CPU. They suggest
this
http://spark.apache.org/docs/latest/tuning.html#level-of-parallelism but
I have observed sometimes considerable performance gains when increasing
number of tasks to 8 or even to 16 per CPU core.
On 13.10.2014 18:53, Pat Ferrel wrote:
> There is a possibility that we are doing something with partitioning that interferes but I think Ted’s point is that Spark should do the right thing in most cases—unless we interfere. Those values are meant for tuning to the exact job you are doing, but it may not be appropriate for us to hard code them. We could allow the CLI to set them like we do with -sem if needed.
>
> Let’s see what Dmitriy thinks about why only one task is being created.
>
> On Oct 13, 2014, at 9:32 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>
> Hi,
>
>> Do you think that simply increasing this parameter is a safe and sane thing
>> to do?
> Why would it be unsafe?
>
> In my own implementation I am using 400 tasks on my 4-node-2cpu cluster and the execution times of largest shuffle stage have dropped around 10 times.
> I have number of test values back from the time when I used "old" RowSimilarityJob and with some exceptions (I guess due to randomized sparsization) I still have approx. the same values with my own row similarity implementation.
>
> reinis
>
> On 13.10.2014 18:06, Ted Dunning wrote:
>> On Mon, Oct 13, 2014 at 11:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>>
>>> I have my own implementation of SimilarityAnalysis and by tuning number of
>>> tasks I have reached HUGE performance gains.
>>>
>>> Since I couldn't find how to pass the number of tasks to shuffle
>>> operations directly, I have set following in spark config
>>>
>>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>>> .set("spark.serializer", "org.apache.spark.serializer.
>>> KryoSerializer")
>>> .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io
>>> .MahoutKryoRegistrator")
>>> .set("spark.kryo.referenceTracking", "false")
>>> .set("spark.kryoserializer.buffer.mb", "200")
>>> .set("spark.default.parallelism", 400) // <- this is the line
>>> supposed to set default parallelism to some high number
>>>
>>> Thank you for your help
>>>
>> Thank you for YOUR help!
>>
>> Do you think that simply increasing this parameter is a safe and sane thing
>> to do?
>>
Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Posted by Pat Ferrel <pa...@occamsmachete.com>.
There is a possibility that we are doing something with partitioning that interferes but I think Ted’s point is that Spark should do the right thing in most cases—unless we interfere. Those values are meant for tuning to the exact job you are doing, but it may not be appropriate for us to hard code them. We could allow the CLI to set them like we do with -sem if needed.
Let’s see what Dmitriy thinks about why only one task is being created.
On Oct 13, 2014, at 9:32 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
Hi,
> Do you think that simply increasing this parameter is a safe and sane thing
> to do?
Why would it be unsafe?
In my own implementation I am using 400 tasks on my 4-node-2cpu cluster and the execution times of largest shuffle stage have dropped around 10 times.
I have number of test values back from the time when I used "old" RowSimilarityJob and with some exceptions (I guess due to randomized sparsization) I still have approx. the same values with my own row similarity implementation.
reinis
On 13.10.2014 18:06, Ted Dunning wrote:
> On Mon, Oct 13, 2014 at 11:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>
>> I have my own implementation of SimilarityAnalysis and by tuning number of
>> tasks I have reached HUGE performance gains.
>>
>> Since I couldn't find how to pass the number of tasks to shuffle
>> operations directly, I have set following in spark config
>>
>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>> .set("spark.serializer", "org.apache.spark.serializer.
>> KryoSerializer")
>> .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io
>> .MahoutKryoRegistrator")
>> .set("spark.kryo.referenceTracking", "false")
>> .set("spark.kryoserializer.buffer.mb", "200")
>> .set("spark.default.parallelism", 400) // <- this is the line
>> supposed to set default parallelism to some high number
>>
>> Thank you for your help
>>
> Thank you for YOUR help!
>
> Do you think that simply increasing this parameter is a safe and sane thing
> to do?
>
Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Posted by Pat Ferrel <pa...@gmail.com>.
I’ll add a new option to escape any spark options and put them directly into the SparkConf for the job before the context is created.
The CLI will be something like -D xxx=yyy so for this case you can change the default parallelism with
-D spark.default.parallelism=400
If the logic holds that you can often have 16 to 8 x your number of cores then running locally on my laptop with local[7] should have -D spark.default.parallelism=112 or 56
If you want this value set for your entire cluster you should be able to set it in the conf files when you launch the cluster. We don’t change any of those values in the client except spark.executor.memory (only if specified) and any escaped values.
On Oct 13, 2014, at 11:32 AM, Ted Dunning <te...@gmail.com> wrote:
On Mon, Oct 13, 2014 at 12:32 PM, Reinis Vicups <ma...@orbit-x.de> wrote:
>
> Do you think that simply increasing this parameter is a safe and sane
>> thing
>> to do?
>>
>
> Why would it be unsafe?
>
> In my own implementation I am using 400 tasks on my 4-node-2cpu cluster
> and the execution times of largest shuffle stage have dropped around 10
> times.
> I have number of test values back from the time when I used "old"
> RowSimilarityJob and with some exceptions (I guess due to randomized
> sparsization) I still have approx. the same values with my own row
> similarity implementation.
>
Splitting things too far can make processes much less efficient. Setting
parameters like this may propagate further than desired.
I asked because I don't know, however.
Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Posted by Ted Dunning <te...@gmail.com>.
On Mon, Oct 13, 2014 at 12:32 PM, Reinis Vicups <ma...@orbit-x.de> wrote:
>
> Do you think that simply increasing this parameter is a safe and sane
>> thing
>> to do?
>>
>
> Why would it be unsafe?
>
> In my own implementation I am using 400 tasks on my 4-node-2cpu cluster
> and the execution times of largest shuffle stage have dropped around 10
> times.
> I have number of test values back from the time when I used "old"
> RowSimilarityJob and with some exceptions (I guess due to randomized
> sparsization) I still have approx. the same values with my own row
> similarity implementation.
>
Splitting things too far can make processes much less efficient. Setting
parameters like this may propagate further than desired.
I asked because I don't know, however.
Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Posted by Reinis Vicups <ma...@orbit-x.de>.
Hi,
>Do you think that simply increasing this parameter is a safe and sane thing
>to do?
Why would it be unsafe?
In my own implementation I am using 400 tasks on my 4-node-2cpu cluster
and the execution times of largest shuffle stage have dropped around 10
times.
I have number of test values back from the time when I used "old"
RowSimilarityJob and with some exceptions (I guess due to randomized
sparsization) I still have approx. the same values with my own row
similarity implementation.
reinis
On 13.10.2014 18:06, Ted Dunning wrote:
> On Mon, Oct 13, 2014 at 11:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
>
>> I have my own implementation of SimilarityAnalysis and by tuning number of
>> tasks I have reached HUGE performance gains.
>>
>> Since I couldn't find how to pass the number of tasks to shuffle
>> operations directly, I have set following in spark config
>>
>> configuration = new SparkConf().setAppName(jobConfig.jobName)
>> .set("spark.serializer", "org.apache.spark.serializer.
>> KryoSerializer")
>> .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io
>> .MahoutKryoRegistrator")
>> .set("spark.kryo.referenceTracking", "false")
>> .set("spark.kryoserializer.buffer.mb", "200")
>> .set("spark.default.parallelism", 400) // <- this is the line
>> supposed to set default parallelism to some high number
>>
>> Thank you for your help
>>
> Thank you for YOUR help!
>
> Do you think that simply increasing this parameter is a safe and sane thing
> to do?
>
Re: Mahout 1.0: parallelism/number tasks during SimilarityAnalysis.rowSimilarity
Posted by Ted Dunning <te...@gmail.com>.
On Mon, Oct 13, 2014 at 11:56 AM, Reinis Vicups <ma...@orbit-x.de> wrote:
> I have my own implementation of SimilarityAnalysis and by tuning number of
> tasks I have reached HUGE performance gains.
>
> Since I couldn't find how to pass the number of tasks to shuffle
> operations directly, I have set following in spark config
>
> configuration = new SparkConf().setAppName(jobConfig.jobName)
> .set("spark.serializer", "org.apache.spark.serializer.
> KryoSerializer")
> .set("spark.kryo.registrator", "org.apache.mahout.sparkbindings.io
> .MahoutKryoRegistrator")
> .set("spark.kryo.referenceTracking", "false")
> .set("spark.kryoserializer.buffer.mb", "200")
> .set("spark.default.parallelism", 400) // <- this is the line
> supposed to set default parallelism to some high number
>
> Thank you for your help
>
Thank you for YOUR help!
Do you think that simply increasing this parameter is a safe and sane thing
to do?