You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sujit Pal <su...@gmail.com> on 2015/08/02 19:17:09 UTC

Re: How to increase parallelism of a Spark cluster?

No one has any ideas?

Is there some more information I should provide?

I am looking for ways to increase the parallelism among workers. Currently
I just see number of simultaneous connections to Solr equal to the number
of workers. My number of partitions is (2.5x) larger than number of
workers, and the workers seem to be large enough to handle more than one
task at a time.

I am creating a single client per partition in my mapPartition call. Not
sure if that is creating the gating situation? Perhaps I should use a Pool
of clients instead?

Would really appreciate some pointers.

Thanks in advance for any help you can provide.

-sujit


On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal <su...@gmail.com> wrote:

> Hello,
>
> I am trying to run a Spark job that hits an external webservice to get
> back some information. The cluster is 1 master + 4 workers, each worker has
> 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
> and is accessed using code similar to that shown below.
>
> def getResults(keyValues: Iterator[(String, Array[String])]):
>>         Iterator[(String, String)] = {
>>     val solr = new HttpSolrClient()
>>     initializeSolrParameters(solr)
>>     keyValues.map(keyValue => (keyValue._1, process(solr, keyValue)))
>> }
>> myRDD.repartition(10)
>
>              .mapPartitions(keyValues => getResults(keyValues))
>>
>
> The mapPartitions does some initialization to the SolrJ client per
> partition and then hits it for each record in the partition via the
> getResults() call.
>
> I repartitioned in the hope that this will result in 10 clients hitting
> Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
> clients if I can). However, I counted the number of open connections using
> "netstat -anp | grep ":8983.*ESTABLISHED" in a loop on the Solr box and
> observed that Solr has a constant 4 clients (ie, equal to the number of
> workers) over the lifetime of the run.
>
> My observation leads me to believe that each worker processes a single
> stream of work sequentially. However, from what I understand about how
> Spark works, each worker should be able to process number of tasks
> parallelly, and that repartition() is a hint for it to do so.
>
> Is there some SparkConf environment variable I should set to increase
> parallelism in these workers, or should I just configure a cluster with
> multiple workers per machine? Or is there something I am doing wrong?
>
> Thank you in advance for any pointers you can provide.
>
> -sujit
>
>

Re: How to increase parallelism of a Spark cluster?

Posted by Igor Berman <ig...@gmail.com>.
so how many cores you configure per node?
do u have something like ----total-executor-cores or maybe
--num-executors config(I'm
not sure what kind of cluster databricks platform provides, if it's
standalone then first option should be used)? if you have 4 cores at total,
then even though you have 4 cores per machine only 1 is working on each
machine...which could be a cause.
another option - you are hitting some default config of limiting number of
concurrent routes or max total connection from jvm,
look at
https://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html
 (assuming you are using HttpClient from 4.x and not 3.x version)
not sure what are the defaults...



On 2 August 2015 at 23:42, Sujit Pal <su...@gmail.com> wrote:

> Hi Igor,
>
> The cluster is a Databricks Spark cluster. It consists of 1 master + 4
> workers, each worker has 60GB RAM and 4 CPUs. The original mail has some
> more details (also the reference to the HttpSolrClient in there should be
> HttpSolrServer, sorry about that, mistake while writing the email).
>
> There is no additional configuration on the external Solr host from my
> code, I am using the default HttpClient provided by HttpSolrServer.
> According to the Javadocs, you can pass in a HttpClient object as well. Is
> there some specific configuration you would suggest to get past any limits?
>
> On another project, I faced a similar problem but I had more leeway (was
> using a Spark cluster from EC2) and less time, my workaround was to use
> python multiprocessing to create a program that started up 30 python
> JSON/HTTP clients and wrote output into 30 output files, which were then
> processed by Spark. Reason I mention this is that I was using default
> configurations there as well, just needed to increase the number of
> connections against Solr to a higher number.
>
> This time round, I would like to do this through Spark because it makes
> the pipeline less complex.
>
> -sujit
>
>
> On Sun, Aug 2, 2015 at 10:52 AM, Igor Berman <ig...@gmail.com>
> wrote:
>
>> What kind of cluster? How many cores on each worker? Is there config for
>> http solr client? I remember standard httpclient has limit per route/host.
>> On Aug 2, 2015 8:17 PM, "Sujit Pal" <su...@gmail.com> wrote:
>>
>>> No one has any ideas?
>>>
>>> Is there some more information I should provide?
>>>
>>> I am looking for ways to increase the parallelism among workers.
>>> Currently I just see number of simultaneous connections to Solr equal to
>>> the number of workers. My number of partitions is (2.5x) larger than number
>>> of workers, and the workers seem to be large enough to handle more than one
>>> task at a time.
>>>
>>> I am creating a single client per partition in my mapPartition call. Not
>>> sure if that is creating the gating situation? Perhaps I should use a Pool
>>> of clients instead?
>>>
>>> Would really appreciate some pointers.
>>>
>>> Thanks in advance for any help you can provide.
>>>
>>> -sujit
>>>
>>>
>>> On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal <su...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am trying to run a Spark job that hits an external webservice to get
>>>> back some information. The cluster is 1 master + 4 workers, each worker has
>>>> 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
>>>> and is accessed using code similar to that shown below.
>>>>
>>>> def getResults(keyValues: Iterator[(String, Array[String])]):
>>>>>         Iterator[(String, String)] = {
>>>>>     val solr = new HttpSolrClient()
>>>>>     initializeSolrParameters(solr)
>>>>>     keyValues.map(keyValue => (keyValue._1, process(solr, keyValue)))
>>>>> }
>>>>> myRDD.repartition(10)
>>>>
>>>>              .mapPartitions(keyValues => getResults(keyValues))
>>>>>
>>>>
>>>> The mapPartitions does some initialization to the SolrJ client per
>>>> partition and then hits it for each record in the partition via the
>>>> getResults() call.
>>>>
>>>> I repartitioned in the hope that this will result in 10 clients hitting
>>>> Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
>>>> clients if I can). However, I counted the number of open connections using
>>>> "netstat -anp | grep ":8983.*ESTABLISHED" in a loop on the Solr box and
>>>> observed that Solr has a constant 4 clients (ie, equal to the number of
>>>> workers) over the lifetime of the run.
>>>>
>>>> My observation leads me to believe that each worker processes a single
>>>> stream of work sequentially. However, from what I understand about how
>>>> Spark works, each worker should be able to process number of tasks
>>>> parallelly, and that repartition() is a hint for it to do so.
>>>>
>>>> Is there some SparkConf environment variable I should set to increase
>>>> parallelism in these workers, or should I just configure a cluster with
>>>> multiple workers per machine? Or is there something I am doing wrong?
>>>>
>>>> Thank you in advance for any pointers you can provide.
>>>>
>>>> -sujit
>>>>
>>>>
>>>
>

RE: How to increase parallelism of a Spark cluster?

Posted by Silvio Fiorito <si...@granturing.com>.
Can you share the transformations up to the foreachPartition?
________________________________
From: Sujit Pal<ma...@gmail.com>
Sent: ‎8/‎2/‎2015 4:42 PM
To: Igor Berman<ma...@gmail.com>
Cc: user<ma...@spark.apache.org>
Subject: Re: How to increase parallelism of a Spark cluster?

Hi Igor,

The cluster is a Databricks Spark cluster. It consists of 1 master + 4 workers, each worker has 60GB RAM and 4 CPUs. The original mail has some more details (also the reference to the HttpSolrClient in there should be HttpSolrServer, sorry about that, mistake while writing the email).

There is no additional configuration on the external Solr host from my code, I am using the default HttpClient provided by HttpSolrServer. According to the Javadocs, you can pass in a HttpClient object as well. Is there some specific configuration you would suggest to get past any limits?

On another project, I faced a similar problem but I had more leeway (was using a Spark cluster from EC2) and less time, my workaround was to use python multiprocessing to create a program that started up 30 python JSON/HTTP clients and wrote output into 30 output files, which were then processed by Spark. Reason I mention this is that I was using default configurations there as well, just needed to increase the number of connections against Solr to a higher number.

This time round, I would like to do this through Spark because it makes the pipeline less complex.

-sujit


On Sun, Aug 2, 2015 at 10:52 AM, Igor Berman <ig...@gmail.com>> wrote:

What kind of cluster? How many cores on each worker? Is there config for http solr client? I remember standard httpclient has limit per route/host.

On Aug 2, 2015 8:17 PM, "Sujit Pal" <su...@gmail.com>> wrote:
No one has any ideas?

Is there some more information I should provide?

I am looking for ways to increase the parallelism among workers. Currently I just see number of simultaneous connections to Solr equal to the number of workers. My number of partitions is (2.5x) larger than number of workers, and the workers seem to be large enough to handle more than one task at a time.

I am creating a single client per partition in my mapPartition call. Not sure if that is creating the gating situation? Perhaps I should use a Pool of clients instead?

Would really appreciate some pointers.

Thanks in advance for any help you can provide.

-sujit


On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal <su...@gmail.com>> wrote:
Hello,

I am trying to run a Spark job that hits an external webservice to get back some information. The cluster is 1 master + 4 workers, each worker has 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server, and is accessed using code similar to that shown below.

def getResults(keyValues: Iterator[(String, Array[String])]):
        Iterator[(String, String)] = {
    val solr = new HttpSolrClient()
    initializeSolrParameters(solr)
    keyValues.map(keyValue => (keyValue._1, process(solr, keyValue)))
}
myRDD.repartition(10)
             .mapPartitions(keyValues => getResults(keyValues))

The mapPartitions does some initialization to the SolrJ client per partition and then hits it for each record in the partition via the getResults() call.

I repartitioned in the hope that this will result in 10 clients hitting Solr simultaneously (I would like to go upto maybe 30-40 simultaneous clients if I can). However, I counted the number of open connections using "netstat -anp | grep ":8983.*ESTABLISHED" in a loop on the Solr box and observed that Solr has a constant 4 clients (ie, equal to the number of workers) over the lifetime of the run.

My observation leads me to believe that each worker processes a single stream of work sequentially. However, from what I understand about how Spark works, each worker should be able to process number of tasks parallelly, and that repartition() is a hint for it to do so.

Is there some SparkConf environment variable I should set to increase parallelism in these workers, or should I just configure a cluster with multiple workers per machine? Or is there something I am doing wrong?

Thank you in advance for any pointers you can provide.

-sujit




Re: How to increase parallelism of a Spark cluster?

Posted by Steve Loughran <st...@hortonworks.com>.
On 2 Aug 2015, at 13:42, Sujit Pal <su...@gmail.com>> wrote:

There is no additional configuration on the external Solr host from my code, I am using the default HttpClient provided by HttpSolrServer. According to the Javadocs, you can pass in a HttpClient object as well. Is there some specific configuration you would suggest to get past any limits?


Usually there's some thread pooling going on client side, covered in docs like
http://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html

I don't know if that applies, how to tune it, etc. I do know that if you go the other way and allow unlimited connections you raise "different" support problems.

-steve

Re: How to increase parallelism of a Spark cluster?

Posted by Sujit Pal <su...@gmail.com>.
Hi Igor,

The cluster is a Databricks Spark cluster. It consists of 1 master + 4
workers, each worker has 60GB RAM and 4 CPUs. The original mail has some
more details (also the reference to the HttpSolrClient in there should be
HttpSolrServer, sorry about that, mistake while writing the email).

There is no additional configuration on the external Solr host from my
code, I am using the default HttpClient provided by HttpSolrServer.
According to the Javadocs, you can pass in a HttpClient object as well. Is
there some specific configuration you would suggest to get past any limits?

On another project, I faced a similar problem but I had more leeway (was
using a Spark cluster from EC2) and less time, my workaround was to use
python multiprocessing to create a program that started up 30 python
JSON/HTTP clients and wrote output into 30 output files, which were then
processed by Spark. Reason I mention this is that I was using default
configurations there as well, just needed to increase the number of
connections against Solr to a higher number.

This time round, I would like to do this through Spark because it makes the
pipeline less complex.

-sujit


On Sun, Aug 2, 2015 at 10:52 AM, Igor Berman <ig...@gmail.com> wrote:

> What kind of cluster? How many cores on each worker? Is there config for
> http solr client? I remember standard httpclient has limit per route/host.
> On Aug 2, 2015 8:17 PM, "Sujit Pal" <su...@gmail.com> wrote:
>
>> No one has any ideas?
>>
>> Is there some more information I should provide?
>>
>> I am looking for ways to increase the parallelism among workers.
>> Currently I just see number of simultaneous connections to Solr equal to
>> the number of workers. My number of partitions is (2.5x) larger than number
>> of workers, and the workers seem to be large enough to handle more than one
>> task at a time.
>>
>> I am creating a single client per partition in my mapPartition call. Not
>> sure if that is creating the gating situation? Perhaps I should use a Pool
>> of clients instead?
>>
>> Would really appreciate some pointers.
>>
>> Thanks in advance for any help you can provide.
>>
>> -sujit
>>
>>
>> On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal <su...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I am trying to run a Spark job that hits an external webservice to get
>>> back some information. The cluster is 1 master + 4 workers, each worker has
>>> 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
>>> and is accessed using code similar to that shown below.
>>>
>>> def getResults(keyValues: Iterator[(String, Array[String])]):
>>>>         Iterator[(String, String)] = {
>>>>     val solr = new HttpSolrClient()
>>>>     initializeSolrParameters(solr)
>>>>     keyValues.map(keyValue => (keyValue._1, process(solr, keyValue)))
>>>> }
>>>> myRDD.repartition(10)
>>>
>>>              .mapPartitions(keyValues => getResults(keyValues))
>>>>
>>>
>>> The mapPartitions does some initialization to the SolrJ client per
>>> partition and then hits it for each record in the partition via the
>>> getResults() call.
>>>
>>> I repartitioned in the hope that this will result in 10 clients hitting
>>> Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
>>> clients if I can). However, I counted the number of open connections using
>>> "netstat -anp | grep ":8983.*ESTABLISHED" in a loop on the Solr box and
>>> observed that Solr has a constant 4 clients (ie, equal to the number of
>>> workers) over the lifetime of the run.
>>>
>>> My observation leads me to believe that each worker processes a single
>>> stream of work sequentially. However, from what I understand about how
>>> Spark works, each worker should be able to process number of tasks
>>> parallelly, and that repartition() is a hint for it to do so.
>>>
>>> Is there some SparkConf environment variable I should set to increase
>>> parallelism in these workers, or should I just configure a cluster with
>>> multiple workers per machine? Or is there something I am doing wrong?
>>>
>>> Thank you in advance for any pointers you can provide.
>>>
>>> -sujit
>>>
>>>
>>

Re: How to increase parallelism of a Spark cluster?

Posted by Igor Berman <ig...@gmail.com>.
What kind of cluster? How many cores on each worker? Is there config for
http solr client? I remember standard httpclient has limit per route/host.
On Aug 2, 2015 8:17 PM, "Sujit Pal" <su...@gmail.com> wrote:

> No one has any ideas?
>
> Is there some more information I should provide?
>
> I am looking for ways to increase the parallelism among workers. Currently
> I just see number of simultaneous connections to Solr equal to the number
> of workers. My number of partitions is (2.5x) larger than number of
> workers, and the workers seem to be large enough to handle more than one
> task at a time.
>
> I am creating a single client per partition in my mapPartition call. Not
> sure if that is creating the gating situation? Perhaps I should use a Pool
> of clients instead?
>
> Would really appreciate some pointers.
>
> Thanks in advance for any help you can provide.
>
> -sujit
>
>
> On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal <su...@gmail.com> wrote:
>
>> Hello,
>>
>> I am trying to run a Spark job that hits an external webservice to get
>> back some information. The cluster is 1 master + 4 workers, each worker has
>> 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
>> and is accessed using code similar to that shown below.
>>
>> def getResults(keyValues: Iterator[(String, Array[String])]):
>>>         Iterator[(String, String)] = {
>>>     val solr = new HttpSolrClient()
>>>     initializeSolrParameters(solr)
>>>     keyValues.map(keyValue => (keyValue._1, process(solr, keyValue)))
>>> }
>>> myRDD.repartition(10)
>>
>>              .mapPartitions(keyValues => getResults(keyValues))
>>>
>>
>> The mapPartitions does some initialization to the SolrJ client per
>> partition and then hits it for each record in the partition via the
>> getResults() call.
>>
>> I repartitioned in the hope that this will result in 10 clients hitting
>> Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
>> clients if I can). However, I counted the number of open connections using
>> "netstat -anp | grep ":8983.*ESTABLISHED" in a loop on the Solr box and
>> observed that Solr has a constant 4 clients (ie, equal to the number of
>> workers) over the lifetime of the run.
>>
>> My observation leads me to believe that each worker processes a single
>> stream of work sequentially. However, from what I understand about how
>> Spark works, each worker should be able to process number of tasks
>> parallelly, and that repartition() is a hint for it to do so.
>>
>> Is there some SparkConf environment variable I should set to increase
>> parallelism in these workers, or should I just configure a cluster with
>> multiple workers per machine? Or is there something I am doing wrong?
>>
>> Thank you in advance for any pointers you can provide.
>>
>> -sujit
>>
>>
>