You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by SK <sk...@gmail.com> on 2014/09/02 20:24:58 UTC

mllib performance on cluster

Hi,

I evaluated the runtime performance of some of the MLlib classification
algorithms on a local machine and a cluster with 10 nodes. I used standalone
mode and Spark 1.0.1 in both cases. Here are the results for the total
runtime:
                                   Local             Cluster
Logistic regression       138 sec          336 sec
SVM                           138 sec          336 sec
Decision tree                 50 sec         132 sec

My dataset is quite small and my programs are very similar to the mllib
examples that are included in the Spark distribution. Why is the runtime on
the cluster significantly higher (almost 3 times) than that on the local
machine even though the former uses more memory and more nodes? Is it
because of the communication overhead on the cluster? I would like to know
if there is something I need to be doing to optimize the performance on the
cluster or if others have also been getting similar results. 

thanks
   



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark streaming for synchronous API

Posted by Ron's Yahoo! <zl...@yahoo.com.INVALID>.
Tobias,
  Let me explain a little more.
  I want to create a synchronous REST API that will process some data that is passed in as some request.
  I would imagine that the Spark Streaming Job on YARN is a long running job that waits on requests from something. What that something is is still not clear to me, but I would imagine that it’s some queue. The goal is to be able to push a message onto a queue with some id, and then get the processed results back from Spark Streaming.
  The goal is for the REST API be able to respond to lots of calls with low latency.
  Hope that clarifies things...

Thanks,
Ron


On Sep 8, 2014, at 7:41 PM, Tobias Pfeiffer <tg...@preferred.jp> wrote:

> Ron,
> 
> On Tue, Sep 9, 2014 at 11:27 AM, Ron's Yahoo! <zl...@yahoo.com.invalid> wrote:
>   I’m trying to figure out how I can run Spark Streaming like an API.
>   The goal is to have a synchronous REST API that runs the spark data flow on YARN.
> 
> I guess I *may* develop something similar in the future.
> 
> By "a synchronous REST API", do you mean that submitting the job is synchronous and you would fetch the processing results via a different call? Or do you want to submit a job and get the processed data back as an HTTP stream?
> 
> To begin with, is it even possible to have Spark Streaming run as a yarn job?
> 
> I think it is very much possible to run Spark Streaming as a YARN job; at least it worked well with Mesos.
> 
> Tobias
> 


Re: Spark streaming for synchronous API

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi again,

On Tue, Sep 9, 2014 at 2:20 PM, Tobias Pfeiffer <tg...@preferred.jp> wrote:
>
> On Tue, Sep 9, 2014 at 2:02 PM, Ron's Yahoo! <zl...@yahoo.com> wrote:
>>
>>   For example, let’s say there’s a particular topic T1 in a Kafka queue.
>> If I have a new set of requests coming from a particular client A, I was
>> wondering if I could create a partition A.
>>   The streaming job is submitted to listen to T1.A and will write to a
>> topic T2.A, which the REST endpoint would be listening on.
>>
>
> That doesn't seem like a good way to use Kafka. It may be possible, but I
> am pretty sure you should create a new topic T_A instead of a partition A
> in an existing topic. With some modifications of Spark Streaming's
> KafkaReceiver you *might* be able to get it to work as you imagine, but it
> was not meant to be that way, I think.
>

Maybe I was wrong about a new topic being the better way. Looking, for
example, at the way that Samza consumes Kafka streams <
http://samza.incubator.apache.org/learn/documentation/latest/introduction/concepts.html>,
it seems like there is one task per partition and data can go into
partitions keyed by user ID. So maybe a new partition is actually the
conceptually better way.

Nonetheless, the built-in KafkaReceiver doesn't support assignment of
partitions to receivers AFAIK ;-)

Tobias

Re: Spark streaming for synchronous API

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

On Tue, Sep 9, 2014 at 2:02 PM, Ron's Yahoo! <zl...@yahoo.com> wrote:

>   So I guess where I was coming from was the assumption that starting up a
> new job to be listening on a particular queue topic could be done
> asynchronously.
>

No, with the current state of Spark Streaming, all data sources and the
processing pipeline must be fixed when you start your StreamingContext. You
cannot add new data sources dynamically at the moment, see
http://apache-spark-user-list.1001560.n3.nabble.com/Multi-tenancy-for-Spark-Streaming-Applications-td13398.html


>   For example, let’s say there’s a particular topic T1 in a Kafka queue.
> If I have a new set of requests coming from a particular client A, I was
> wondering if I could create a partition A.
>   The streaming job is submitted to listen to T1.A and will write to a
> topic T2.A, which the REST endpoint would be listening on.
>

That doesn't seem like a good way to use Kafka. It may be possible, but I
am pretty sure you should create a new topic T_A instead of a partition A
in an existing topic. With some modifications of Spark Streaming's
KafkaReceiver you *might* be able to get it to work as you imagine, but it
was not meant to be that way, I think.

Also, you will not get "low latency", because Spark Streaming processes
data in batches of fixed interval length (say, 1 second) and in the worst
case your query will wait up to 1 second before processing even starts.

If I understand correctly what you are trying to do (which I am not sure
about), I would probably recommend to choose a bit of a different
architecture; in particular given that you cannot dynamically add data
sources.

Tobias

Re: Spark streaming for synchronous API

Posted by Ron's Yahoo! <zl...@yahoo.com.INVALID>.
Hi Tobias,
  So I guess where I was coming from was the assumption that starting up a new job to be listening on a particular queue topic could be done asynchronously.
  For example, let’s say there’s a particular topic T1 in a Kafka queue. If I have a new set of requests coming from a particular client A, I was wondering if I could create a partition A.
  The streaming job is submitted to listen to T1.A and will write to a topic T2.A, which the REST endpoint would be listening on.
  It does seem a little contrived but the ultimate goal here is to get a bunch of messages from a queue, distribute to a bunch of Spark jobs that process and write back to another queue, which the REST endpoint synchronously waits on. Storm might be a better fit, but the background behind this question is that I want to reuse the same set of transformations for both batch and streaming, with the streaming use case represented by a REST call.
  In other words, the job submission would not be part of the equation so I would imagine the latency is limited to the processing, write back and consumption of the processed message by the original REST request.
  Let me know what you think…

Thanks,
Ron

On Sep 8, 2014, at 9:28 PM, Tobias Pfeiffer <tg...@preferred.jp> wrote:

> Hi,
> 
> On Tue, Sep 9, 2014 at 12:59 PM, Ron's Yahoo! <zl...@yahoo.com> wrote:
>  I want to create a synchronous REST API that will process some data that is passed in as some request.
>  I would imagine that the Spark Streaming Job on YARN is a long running job that waits on requests from something. What that something is is still not clear to me, but I would imagine that it’s some queue. The goal is to be able to push a message onto a queue with some id, and then  get the processed results back from Spark Streaming.
> 
> That is not exactly a Spark Streaming use case, I think. Spark Streaming pulls data from some source (like a queue), then processes all data collected in a certain interval in a mini-batch, and stores that data somewhere. It is not well suited for handling request-response cycles in a synchronous way; you might consider using plain Spark (without Streaming) for that.
> 
> For example, you could use the unfiltered http://unfiltered.databinder.net/Unfiltered.html library and within request handling do some RDD operation, returning the output as HTTP response. This works fine as multiple threads can submit Spark jobs concurrently https://spark.apache.org/docs/latest/job-scheduling.html You could also check https://github.com/adobe-research/spindle -- that seems to be similar to what you are doing.
> 
>  The goal is for the REST API be able to respond to lots of calls with low latency.
>  Hope that clarifies things...
> 
> Note that "low latency" for "lots of calls" is maybe not something that Spark was built for. Even if you do close to nothing data processing, you may not get below 200ms or so due to the overhead of submitting jobs etc., from my experience.
> 
> Tobias
> 
> 


Re: Spark streaming for synchronous API

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

On Tue, Sep 9, 2014 at 12:59 PM, Ron's Yahoo! <zl...@yahoo.com> wrote:
>
>  I want to create a synchronous REST API that will process some data that
> is passed in as some request.
>  I would imagine that the Spark Streaming Job on YARN is a long
> running job that waits on requests from something. What that something is
> is still not clear to me, but I would imagine that it’s some queue.
> The goal is to be able to push a message onto a queue with some id, and
> then  get the processed results back from Spark Streaming.
>

That is not exactly a Spark Streaming use case, I think. Spark Streaming
pulls data from some source (like a queue), then processes all data
collected in a certain interval in a mini-batch, and stores that data
somewhere. It is not well suited for handling request-response cycles in a
synchronous way; you might consider using plain Spark (without Streaming)
for that.

For example, you could use the unfiltered
http://unfiltered.databinder.net/Unfiltered.html library and within request
handling do some RDD operation, returning the output as HTTP response. This
works fine as multiple threads can submit Spark jobs concurrently
https://spark.apache.org/docs/latest/job-scheduling.html You could also
check https://github.com/adobe-research/spindle -- that seems to be similar
to what you are doing.

 The goal is for the REST API be able to respond to lots of calls with low
> latency.
>  Hope that clarifies things...
>

Note that "low latency" for "lots of calls" is maybe not something that
Spark was built for. Even if you do close to nothing data processing, you
may not get below 200ms or so due to the overhead of submitting jobs etc.,
from my experience.

Tobias

Re: Spark streaming for synchronous API

Posted by Ron's Yahoo! <zl...@yahoo.com.INVALID>.
Tobias,
 Let me explain a little more.
 I want to create a synchronous REST API that will process some data that is passed in as some request.
 I would imagine that the Spark Streaming Job on YARN is a long running job that waits on requests from something. What that something is is still not clear to me, but I would imagine that it’s some queue. The goal is to be able to push a message onto a queue with some id, and then  get the processed results back from Spark Streaming.
 The goal is for the REST API be able to respond to lots of calls with low latency.
 Hope that clarifies things...

Thanks,
Ron

On Sep 8, 2014, at 7:41 PM, Tobias Pfeiffer <tg...@preferred.jp> wrote:

> Ron,
> 
> On Tue, Sep 9, 2014 at 11:27 AM, Ron's Yahoo! <zl...@yahoo.com.invalid> wrote:
>   I’m trying to figure out how I can run Spark Streaming like an API.
>   The goal is to have a synchronous REST API that runs the spark data flow on YARN.
> 
> I guess I *may* develop something similar in the future.
> 
> By "a synchronous REST API", do you mean that submitting the job is synchronous and you would fetch the processing results via a different call? Or do you want to submit a job and get the processed data back as an HTTP stream?
> 
> To begin with, is it even possible to have Spark Streaming run as a yarn job?
> 
> I think it is very much possible to run Spark Streaming as a YARN job; at least it worked well with Mesos.
> 
> Tobias
> 


Re: Spark streaming for synchronous API

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Ron,

On Tue, Sep 9, 2014 at 11:27 AM, Ron's Yahoo! <zl...@yahoo.com.invalid>
 wrote:
>
>   I’m trying to figure out how I can run Spark Streaming like an API.
>   The goal is to have a synchronous REST API that runs the spark data flow
> on YARN.


I guess I *may* develop something similar in the future.

By "a synchronous REST API", do you mean that submitting the job is
synchronous and you would fetch the processing results via a different
call? Or do you want to submit a job and get the processed data back as an
HTTP stream?

To begin with, is it even possible to have Spark Streaming run as a yarn
> job?
>

I think it is very much possible to run Spark Streaming as a YARN job; at
least it worked well with Mesos.

Tobias

Spark streaming for synchronous API

Posted by Ron's Yahoo! <zl...@yahoo.com.INVALID>.
Hi,
  I’m trying to figure out how I can run Spark Streaming like an API.
  The goal is to have a synchronous REST API that runs the spark data flow on YARN.
  Has anyone done something like this? Can you share your architecture? To begin with, is it even possible to have Spark Streaming run as a yarn job?

Thanks,
Ron
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: mllib performance on cluster

Posted by "Evan R. Sparks" <ev...@gmail.com>.
I spoke with SK offline about this, it looks like the difference in timings
came from the fact that he was training 100 models for 100 iterations and
taking the total time (vs. my example which trains a single model for 100
iterations). I'm posting my response here, though, because I think it's
worth documenting:

Benchmarking on a dataset this small on this many cores is probably not
going to give you any meaningful information about how the algorithms scale
to "real" data problems.

In this case, you've thrown 200 cores at 5.6kb of data - 200
low-dimensional data points. The overheads of scheduling tasks, sending
them out to each worker, and network latencies between the nodes, which are
essentially fixed regardless of problem size are COMPLETELY dominating the
time spent computing - which in the first two cases is 9-10 flops per data
point and in the last case is a couple of array lookups and adds per data
point.

It would make a lot more sense to find or generate a dataset that's 10 or
100GB and see how performance scales there. You can do this with the code I
pasted earlier, just change the second, third, and fourth arguments to an
appropriate number of elements, dimensionality, and number of partitions
that matches the number of cores you have on your cluster.

In short, don't use a cluster unless you need one :).

Hope this helps!


On Tue, Sep 2, 2014 at 3:51 PM, SK <sk...@gmail.com> wrote:

> The dataset is quite small : 5.6 KB.  It has 200 rows and 3 features, and 1
> column of labels.  From this dataset, I split 80% for training set and 20%
> for test set. The features are integer counts and labels are binary (1/0).
>
> thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13311.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: mllib performance on cluster

Posted by "Evan R. Sparks" <ev...@gmail.com>.
Hmm... something is fishy here.

That's a *really* small dataset for a spark job, so almost all your time
will be spent in these overheads, but still you should be able to train a
logistic regression model with the default options and 100 iterations in
<1s on a single machine.
Are you caching your dataset before training the classifier on it? It's
possible that you're rereading it from disk (or across the internet, maybe)
on every iteration?

>From spark-shell:

import org.apache.spark.mllib.util.LogisticRegressionDataGenerator

val dat = LogisticRegressionDataGenerator.generateLogisticRDD(sc, 200, 3,
1e-4, 4, 0.2).cache()

println(dat.count()) //should give 200

import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

val start = System.currentTimeMillis; val model =
LogisticRegressionWithSGD.train(dat, 100); val delta =
System.currentTimeMillis - start;

println(delta) //On my laptop, 863ms.








On Tue, Sep 2, 2014 at 3:51 PM, SK <sk...@gmail.com> wrote:

> The dataset is quite small : 5.6 KB.  It has 200 rows and 3 features, and 1
> column of labels.  From this dataset, I split 80% for training set and 20%
> for test set. The features are integer counts and labels are binary (1/0).
>
> thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13311.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: mllib performance on cluster

Posted by SK <sk...@gmail.com>.
The dataset is quite small : 5.6 KB.  It has 200 rows and 3 features, and 1
column of labels.  From this dataset, I split 80% for training set and 20%
for test set. The features are integer counts and labels are binary (1/0).

thanks



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13311.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: mllib performance on cluster

Posted by Bharath Mundlapudi <mu...@gmail.com>.
Those are interesting numbers. You haven't mentioned the dataset size in
your thread. This is a classic example of scalability and performance
assuming your baseline numbers are correct and you tuned correctly
everything on your cluster.

Putting my outside cap, there are multiple reasons for this, we need to
look at all these parameters:
1. This could be an algorithm cost when we move to cluster
2. This could a scalability cost
3. Cluster not tuned well
4. Indeed, there is a problem/performance regression in the framework.






On Tue, Sep 2, 2014 at 1:12 PM, SK <sk...@gmail.com> wrote:

> NUm Iterations: For  LR and SVM, I am using the default value of 100.  All
> the other parameters also I am using the default values.  I am pretty much
> reusing the code from BinaryClassification.scala.  For Decision Tree, I
> dont
> see any parameter for number of iterations inthe example code, so I did not
> specify any. I am running each algorithm on my dataset 100 times and taking
> the average runtime.
>
> MY dataset is very dense (hardly any zeros). The labels are 1 and 0.
>
> I did not explicity specify the number of partitions. I did not see any
> code
> for this in the MLLib examples for BinaryClassification and DecisionTree.
>
> hardware:
> local: intel core i7 with 12 cores and 7.8 GB of which I am allocating 4GB
> for the executor memory. According to the application detail stats in the
> spark UI, the total memory consumed is around 1.5 GB.
>
> cluster: 10 nodes with a total of 320 cores, with 16GB per node. According
> to the application detail stats in the spark UI, the total memory consumed
> is around 95.5 GB.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13299.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: mllib performance on cluster

Posted by SK <sk...@gmail.com>.
NUm Iterations: For  LR and SVM, I am using the default value of 100.  All
the other parameters also I am using the default values.  I am pretty much
reusing the code from BinaryClassification.scala.  For Decision Tree, I dont
see any parameter for number of iterations inthe example code, so I did not
specify any. I am running each algorithm on my dataset 100 times and taking
the average runtime. 

MY dataset is very dense (hardly any zeros). The labels are 1 and 0. 

I did not explicity specify the number of partitions. I did not see any code
for this in the MLLib examples for BinaryClassification and DecisionTree.

hardware: 
local: intel core i7 with 12 cores and 7.8 GB of which I am allocating 4GB
for the executor memory. According to the application detail stats in the
spark UI, the total memory consumed is around 1.5 GB.

cluster: 10 nodes with a total of 320 cores, with 16GB per node. According
to the application detail stats in the spark UI, the total memory consumed
is around 95.5 GB.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290p13299.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: mllib performance on cluster

Posted by "Evan R. Sparks" <ev...@gmail.com>.
Also - what hardware are you running the cluster on? And what is the local
machine hardware?


On Tue, Sep 2, 2014 at 11:57 AM, Evan R. Sparks <ev...@gmail.com>
wrote:

> How many iterations are you running? Can you provide the exact details
> about the size of the dataset? (how many data points, how many features) Is
> this sparse or dense - and for the sparse case, how many non-zeroes? How
> many partitions is your data RDD?
>
> For very small datasets the scheduling overheads of shipping tasks across
> the cluster and delays due to stragglers can dominate the time actually
> doing your parallel computation. If you have too few partitions, you won't
> be taking advantage of cluster parallelism, and if you have too many you're
> introducing even more of the aforementioned overheads.
>
>
>
> On Tue, Sep 2, 2014 at 11:24 AM, SK <sk...@gmail.com> wrote:
>
>> Hi,
>>
>> I evaluated the runtime performance of some of the MLlib classification
>> algorithms on a local machine and a cluster with 10 nodes. I used
>> standalone
>> mode and Spark 1.0.1 in both cases. Here are the results for the total
>> runtime:
>>                                    Local             Cluster
>> Logistic regression       138 sec          336 sec
>> SVM                           138 sec          336 sec
>> Decision tree                 50 sec         132 sec
>>
>> My dataset is quite small and my programs are very similar to the mllib
>> examples that are included in the Spark distribution. Why is the runtime
>> on
>> the cluster significantly higher (almost 3 times) than that on the local
>> machine even though the former uses more memory and more nodes? Is it
>> because of the communication overhead on the cluster? I would like to know
>> if there is something I need to be doing to optimize the performance on
>> the
>> cluster or if others have also been getting similar results.
>>
>> thanks
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: mllib performance on cluster

Posted by "Evan R. Sparks" <ev...@gmail.com>.
How many iterations are you running? Can you provide the exact details
about the size of the dataset? (how many data points, how many features) Is
this sparse or dense - and for the sparse case, how many non-zeroes? How
many partitions is your data RDD?

For very small datasets the scheduling overheads of shipping tasks across
the cluster and delays due to stragglers can dominate the time actually
doing your parallel computation. If you have too few partitions, you won't
be taking advantage of cluster parallelism, and if you have too many you're
introducing even more of the aforementioned overheads.



On Tue, Sep 2, 2014 at 11:24 AM, SK <sk...@gmail.com> wrote:

> Hi,
>
> I evaluated the runtime performance of some of the MLlib classification
> algorithms on a local machine and a cluster with 10 nodes. I used
> standalone
> mode and Spark 1.0.1 in both cases. Here are the results for the total
> runtime:
>                                    Local             Cluster
> Logistic regression       138 sec          336 sec
> SVM                           138 sec          336 sec
> Decision tree                 50 sec         132 sec
>
> My dataset is quite small and my programs are very similar to the mllib
> examples that are included in the Spark distribution. Why is the runtime on
> the cluster significantly higher (almost 3 times) than that on the local
> machine even though the former uses more memory and more nodes? Is it
> because of the communication overhead on the cluster? I would like to know
> if there is something I need to be doing to optimize the performance on the
> cluster or if others have also been getting similar results.
>
> thanks
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/mllib-performance-on-cluster-tp13290.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>