You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Mike Hynes <91...@gmail.com> on 2016/04/04 15:12:13 UTC

RDD Partitions not distributed evenly to executors

[ CC'ing dev list since nearly identical questions have occurred in
user list recently w/o resolution;
c.f.:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
]

Hello,

In short, I'm reporting a problem concerning load imbalance of RDD
partitions across a standalone cluster. Though there are 16 cores
available per node, certain nodes will have >16 partitions, and some
will correspondingly have <16 (and even 0).

In more detail: I am running some scalability/performance tests for
vector-type operations. The RDDs I'm considering are simple block
vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
are generated with a fixed number of elements given by some multiple
of the available cores, and subsequently hash-partitioned by their
integer block index.

I have verified that the hash partitioning key distribution, as well
as the keys themselves, are both correct; the problem is truly that
the partitions are *not* evenly distributed across the nodes.

For instance, here is a representative output for some stages and
tasks in an iterative program. This is a very simple test with 2
nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
examples stages from the stderr log are stages 7 and 9:
7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639

When counting the location of the partitions on the compute nodes from
the stderr logs, however, you can clearly see the imbalance. Examples
lines are:
13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&

Grep'ing the full set of above lines for each hostname, himrod-?,
shows the problem occurs in each stage. Below is the output, where the
number of partitions stored on each node is given alongside its
hostname as in (himrod-?,num_partitions):
Stage 7: (himrod-1,0) (himrod-2,64)
Stage 9: (himrod-1,16) (himrod-2,48)
Stage 12: (himrod-1,0) (himrod-2,64)
Stage 14: (himrod-1,16) (himrod-2,48)
The imbalance is also visible when the executor ID is used to count
the partitions operated on by executors.

I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
(but the modifications do not touch the scheduler, and are irrelevant
for these particular tests). Has something changed radically in 1.6+
that would make a previously (<=1.5) correct configuration go haywire?
Have new configuration settings been added of which I'm unaware that
could lead to this problem?

Please let me know if others in the community have observed this, and
thank you for your time,
Mike

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: RDD Partitions not distributed evenly to executors

Posted by Michael Slavitch <sl...@gmail.com>.
Just to be sure:  Has spark-env.sh and spark-defaults.conf been correctly propagated to all nodes?  Are they identical?


> On Apr 4, 2016, at 9:12 AM, Mike Hynes <91...@gmail.com> wrote:
> 
> [ CC'ing dev list since nearly identical questions have occurred in
> user list recently w/o resolution;
> c.f.:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
> ]
> 
> Hello,
> 
> In short, I'm reporting a problem concerning load imbalance of RDD
> partitions across a standalone cluster. Though there are 16 cores
> available per node, certain nodes will have >16 partitions, and some
> will correspondingly have <16 (and even 0).
> 
> In more detail: I am running some scalability/performance tests for
> vector-type operations. The RDDs I'm considering are simple block
> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
> are generated with a fixed number of elements given by some multiple
> of the available cores, and subsequently hash-partitioned by their
> integer block index.
> 
> I have verified that the hash partitioning key distribution, as well
> as the keys themselves, are both correct; the problem is truly that
> the partitions are *not* evenly distributed across the nodes.
> 
> For instance, here is a representative output for some stages and
> tasks in an iterative program. This is a very simple test with 2
> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
> examples stages from the stderr log are stages 7 and 9:
> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
> 
> When counting the location of the partitions on the compute nodes from
> the stderr logs, however, you can clearly see the imbalance. Examples
> lines are:
> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
> 
> Grep'ing the full set of above lines for each hostname, himrod-?,
> shows the problem occurs in each stage. Below is the output, where the
> number of partitions stored on each node is given alongside its
> hostname as in (himrod-?,num_partitions):
> Stage 7: (himrod-1,0) (himrod-2,64)
> Stage 9: (himrod-1,16) (himrod-2,48)
> Stage 12: (himrod-1,0) (himrod-2,64)
> Stage 14: (himrod-1,16) (himrod-2,48)
> The imbalance is also visible when the executor ID is used to count
> the partitions operated on by executors.
> 
> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
> (but the modifications do not touch the scheduler, and are irrelevant
> for these particular tests). Has something changed radically in 1.6+
> that would make a previously (<=1.5) correct configuration go haywire?
> Have new configuration settings been added of which I'm unaware that
> could lead to this problem?
> 
> Please let me know if others in the community have observed this, and
> thank you for your time,
> Mike
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: RDD Partitions not distributed evenly to executors

Posted by Michael Slavitch <sl...@gmail.com>.
Just to be sure:  Has spark-env.sh and spark-defaults.conf been correctly propagated to all nodes?  Are they identical?


> On Apr 4, 2016, at 9:12 AM, Mike Hynes <91...@gmail.com> wrote:
> 
> [ CC'ing dev list since nearly identical questions have occurred in
> user list recently w/o resolution;
> c.f.:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
> ]
> 
> Hello,
> 
> In short, I'm reporting a problem concerning load imbalance of RDD
> partitions across a standalone cluster. Though there are 16 cores
> available per node, certain nodes will have >16 partitions, and some
> will correspondingly have <16 (and even 0).
> 
> In more detail: I am running some scalability/performance tests for
> vector-type operations. The RDDs I'm considering are simple block
> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
> are generated with a fixed number of elements given by some multiple
> of the available cores, and subsequently hash-partitioned by their
> integer block index.
> 
> I have verified that the hash partitioning key distribution, as well
> as the keys themselves, are both correct; the problem is truly that
> the partitions are *not* evenly distributed across the nodes.
> 
> For instance, here is a representative output for some stages and
> tasks in an iterative program. This is a very simple test with 2
> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
> examples stages from the stderr log are stages 7 and 9:
> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
> 
> When counting the location of the partitions on the compute nodes from
> the stderr logs, however, you can clearly see the imbalance. Examples
> lines are:
> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
> 
> Grep'ing the full set of above lines for each hostname, himrod-?,
> shows the problem occurs in each stage. Below is the output, where the
> number of partitions stored on each node is given alongside its
> hostname as in (himrod-?,num_partitions):
> Stage 7: (himrod-1,0) (himrod-2,64)
> Stage 9: (himrod-1,16) (himrod-2,48)
> Stage 12: (himrod-1,0) (himrod-2,64)
> Stage 14: (himrod-1,16) (himrod-2,48)
> The imbalance is also visible when the executor ID is used to count
> the partitions operated on by executors.
> 
> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
> (but the modifications do not touch the scheduler, and are irrelevant
> for these particular tests). Has something changed radically in 1.6+
> that would make a previously (<=1.5) correct configuration go haywire?
> Have new configuration settings been added of which I'm unaware that
> could lead to this problem?
> 
> Please let me know if others in the community have observed this, and
> thank you for your time,
> Mike
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: RDD Partitions not distributed evenly to executors

Posted by Thunder Stumpges <th...@gmail.com>.
Has anyone figured this out yet!? I have gone looking for this exact
problem (spark 1.6.1) and I cannot get my partitions to be distributed
evenly across executors no matter what I've tried. it has been mentioned
several other times in the user group as well as the dev group (as
mentioned by Mike Hynes initially, as well as a few others I found).

It seems this has been a known issue for the better part of 6 months. Hard
to believe it has had no real progress.

Has anyone got a work around or something that can spread the cached RDD
partitions evenly across executors?

This is having a major performance impact on my Spark Streaming application
which is extremely imbalanced currently.

Thanks in advance!
Thunder


On Wed, Apr 6, 2016 at 6:36 AM Mike Hynes <91...@gmail.com> wrote:

> Hello All (and Devs in particular),
>
> Thank you again for your further responses. Please find a detailed
> email below which identifies the cause (I believe) of the partition
> imbalance problem, which occurs in spark 1.5, 1.6, and a 2.0-SNAPSHOT.
> This is followed by follow-up questions for the dev community with
> more intimate knowledge of the scheduler so that they may confirm my
> guess at the cause, and please provide insight at how best to avoid
> the problem.
>
> Attached to this email are Gantt-chart plots which show the task
> execution over elapsed time in a Spark program. This program was meant
> to investigate the simplest possible vector operation for block-vector
> data stored in RDDs of type RDD[(Int,Vector)]. In the Gantt plots,
> you'll see the tasks shown as horizontal lines along the x axis, which
> shows elapsed time. The shaded regions represent a single executor
> such that all tasks managed by a single executor lie in a contiguous
> shaded region. The executors all managed 16 cores on 4 different
> compute nodes, and the tasks have been sorted and fit into 16 slots
> for each executor according their chronological order, as determined
> by the task information in the event log for the program, such that
> the y-axis corresponds to essentially the unique core id, ranging from
> 1 to 64. The numbers running horizontally at the top of these plots is
> the stage number, as determined by the DAG scheduler.
>
> In the program itself, two block vectors, v_1 and v_2, were created
> and copartitioned, cached, and then added together elementwise through
> a join operation on their block index keys. Stages 0 and 1 correspond
> to the map and count operations to create v_1; stages 2 and 3
> correspond to the same operations on v_2; and stages 6 through 15
> consist of identical count operations to materialize the vector v =
> v_1 + v_2, formed through a join on v_1 and v_2. The vectors v_1 and
> v_2 were initialized by first creating the keys using a
> sc.parallelize{0 to num_blocks - 1} operation, after which the keys
> were partitioned with a HashPartitioner (note that first a dummy map
> {k => (k,k)} on the keys was done so that the HashPartitioner could be
> used; the motivation for this was that, for large block vector RDDs,
> it was be better to hash partition the keys before generating the
> data). The size of the vectors is determined as a multiple of a fixed
> vector block size (size of each sub-block) times the number of
> partitions, which is itself an integer multiple of the number of
> cores. Furthermore, each partition has \gamma blocks. So each
> partition has \gamma blocks; there are \alpha partitions per core, and
> each block has size 2^16.
>
> The first plot, 02_4node_imbalance_spark1.6_run2.pdf, shows a
> representative run of the block vector addition program for \alpha =
> 4, \gamma = 4. A well-balanced partitioning would correspond to 4
> partitions for core, such that each executor is managing 64 tasks.
> However, as you can see in stage 0, this does not occur: there is a
> large imbalance, where cores 46--64 have many more tasks to compute
> than the others.
>
> Observing the order of the task assignment, I believe that what is
> happening here is that, due to the initial random delay of the
> executors in responding/receiving master instructions, the driver is
> assigning more tasks to the executor whose initial wave of tasks
> finishes first. Since there is *no* data locality in stage 0 to factor
> into determining on which nodes the computation should occur, my
> understanding is that the driver will allocate the tasks
> greedily---hence the initial delay is crucial for allocating
> partitions evenly across the nodes. Furthermore, note that stage 2 (an
> identical vector initialization operation to stage 0) is
> well-balanced, since all of the executors completed tasks at
> approximately the same time, and hence without data locality being a
> factor, were assigned new tasks at the same rate. Also, note here that
> the individual task durations are *decreasing markedly* through stages
> 6--15 (again, all of which are identical), but that the stages are
> longer than need be due to the load imbalance of the tasks.
>
> The second plot, 02_4node_balance_longer.pdf, shows a second version
> of this same program. The code is identical, however the commandline
> input parameters have been changed such that there were 64 partitions
> (\alpha = 1 partition per core), an identical blocksize of 2^16, but
> \gamma = 16 blocks per partitions---i.e. fewer yet larger partitions
> such that the vector is the same size. Here, stage 0 and 2 are both
> evenly partitioned; since the tasks in these stages are longer than
> the initial executor delay, no imbalance is created. However, despite
> the better balance in partitions across the nodes, this program takes
> *longer* in total elapsed time, and the tasks do not seem to be
> getting shorter by the same proportion as in the previous test with
> more partitions.
>
> Given the above, I would like to ask the following questions:
>
> 1. Is my inference correct that the partition imbalance arises due to
> the greedy nature of the scheduler when data locality is not a factor?
>
> 2a. Why are the task times in plot 1 decreasing so dramatically, but
> not in plot 2?
> 2b. Could the decrease in time be due to just-in-time compilation?
> 2c. If so, Why would the JIT occur only for the first case with many
> partitions when the same amount of computational work is to be done in
> both cases?
>
> 3. If an RDD is to be created in such a manner (i.e. initialized for,
> say, an iterative algorithm, rather than by reading data from disk or
> hdfs), what is the best practice to promote good load balancing? My
> first idea would be to create the full RDD with 2x as many partitions
> but then coalesce it down to half the number of partitions with the
> shuffle flag set to true. Would that be reasonable?
>
> Thank you very much for your time, and I very much hope that someone
> from the dev community who is familiar with the scheduler may be able
> to clarify the above observations and questions.
>
> Thanks,
> Mike
>
> P.S. Koert Kuipers: neither spark-defaults.sh setting impacted the
> observed behaviour, but thank you kindly for your suggestions.
>
>
> On 4/5/16, Khaled Ammar <kh...@gmail.com> wrote:
> > I have a similar experience.
> >
> > Using 32 machines, I can see than number of tasks (partitions) assigned
> to
> > executors (machines) is not even. Moreover, the distribution change every
> > stage (iteration).
> >
> > I wonder why Spark needs to move partitions around any way, should not
> the
> > scheduler reduce network (and other IO) overhead by reducing such
> > relocation.
> >
> > Thanks,
> > -Khaled
> >
> >
> >
> >
> > On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers <ko...@tresata.com>
> wrote:
> >
> >> can you try:
> >> spark.shuffle.reduceLocality.enabled=false
> >>
> >> On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91...@gmail.com> wrote:
> >>
> >>> Dear all,
> >>>
> >>> Thank you for your responses.
> >>>
> >>> Michael Slavitch:
> >>> > Just to be sure:  Has spark-env.sh and spark-defaults.conf been
> >>> correctly propagated to all nodes?  Are they identical?
> >>> Yes; these files are stored on a shared memory directory accessible to
> >>> all nodes.
> >>>
> >>> Koert Kuipers:
> >>> > we ran into similar issues and it seems related to the new memory
> >>> > management. can you try:
> >>> > spark.memory.useLegacyMode = true
> >>> I reran the exact same code with a restarted cluster using this
> >>> modification, and did not observe any difference. The partitioning is
> >>> still imbalanced.
> >>>
> >>> Ted Yu:
> >>> > If the changes can be ported over to 1.6.1, do you mind reproducing
> >>> > the
> >>> issue there ?
> >>> Since the spark.memory.useLegacyMode setting did not impact my code
> >>> execution, I will have to change the Spark dependency back to earlier
> >>> versions to see if the issue persists and get back to you.
> >>>
> >>> Meanwhile, if anyone else has any other ideas or experience, please let
> >>> me know.
> >>>
> >>> Mike
> >>>
> >>> On 4/4/16, Koert Kuipers <ko...@tresata.com> wrote:
> >>> > we ran into similar issues and it seems related to the new memory
> >>> > management. can you try:
> >>> > spark.memory.useLegacyMode = true
> >>> >
> >>> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91...@gmail.com> wrote:
> >>> >
> >>> >> [ CC'ing dev list since nearly identical questions have occurred in
> >>> >> user list recently w/o resolution;
> >>> >> c.f.:
> >>> >>
> >>> >>
> >>>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
> >>> >>
> >>> >>
> >>>
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
> >>> >> ]
> >>> >>
> >>> >> Hello,
> >>> >>
> >>> >> In short, I'm reporting a problem concerning load imbalance of RDD
> >>> >> partitions across a standalone cluster. Though there are 16 cores
> >>> >> available per node, certain nodes will have >16 partitions, and some
> >>> >> will correspondingly have <16 (and even 0).
> >>> >>
> >>> >> In more detail: I am running some scalability/performance tests for
> >>> >> vector-type operations. The RDDs I'm considering are simple block
> >>> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
> >>> >> are generated with a fixed number of elements given by some multiple
> >>> >> of the available cores, and subsequently hash-partitioned by their
> >>> >> integer block index.
> >>> >>
> >>> >> I have verified that the hash partitioning key distribution, as well
> >>> >> as the keys themselves, are both correct; the problem is truly that
> >>> >> the partitions are *not* evenly distributed across the nodes.
> >>> >>
> >>> >> For instance, here is a representative output for some stages and
> >>> >> tasks in an iterative program. This is a very simple test with 2
> >>> >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
> >>> >> examples stages from the stderr log are stages 7 and 9:
> >>> >> 7,mapPartitions at
> >>> >> DummyVector.scala:113,64,1459771364404,1459771365272
> >>> >> 9,mapPartitions at
> >>> >> DummyVector.scala:113,64,1459771364431,1459771365639
> >>> >>
> >>> >> When counting the location of the partitions on the compute nodes
> >>> >> from
> >>> >> the stderr logs, however, you can clearly see the imbalance.
> Examples
> >>> >> lines are:
> >>> >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
> >>> >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
> >>> >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
> >>> >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
> >>> >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
> >>> >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
> >>> >>
> >>> >> Grep'ing the full set of above lines for each hostname, himrod-?,
> >>> >> shows the problem occurs in each stage. Below is the output, where
> >>> >> the
> >>> >> number of partitions stored on each node is given alongside its
> >>> >> hostname as in (himrod-?,num_partitions):
> >>> >> Stage 7: (himrod-1,0) (himrod-2,64)
> >>> >> Stage 9: (himrod-1,16) (himrod-2,48)
> >>> >> Stage 12: (himrod-1,0) (himrod-2,64)
> >>> >> Stage 14: (himrod-1,16) (himrod-2,48)
> >>> >> The imbalance is also visible when the executor ID is used to count
> >>> >> the partitions operated on by executors.
> >>> >>
> >>> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT
> >>> >> branch
> >>> >> (but the modifications do not touch the scheduler, and are
> irrelevant
> >>> >> for these particular tests). Has something changed radically in 1.6+
> >>> >> that would make a previously (<=1.5) correct configuration go
> >>> >> haywire?
> >>> >> Have new configuration settings been added of which I'm unaware that
> >>> >> could lead to this problem?
> >>> >>
> >>> >> Please let me know if others in the community have observed this,
> and
> >>> >> thank you for your time,
> >>> >> Mike
> >>> >>
> >>> >>
> ---------------------------------------------------------------------
> >>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >>> >> For additional commands, e-mail: user-help@spark.apache.org
> >>> >>
> >>> >>
> >>> >
> >>>
> >>>
> >>> --
> >>> Thanks,
> >>> Mike
> >>>
> >>
> >>
> >
> >
> > --
> > Thanks,
> > -Khaled
> >
>
>
> --
> Thanks,
> Mike
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org

Re: RDD Partitions not distributed evenly to executors

Posted by Mike Hynes <91...@gmail.com>.
Hello All (and Devs in particular),

Thank you again for your further responses. Please find a detailed
email below which identifies the cause (I believe) of the partition
imbalance problem, which occurs in spark 1.5, 1.6, and a 2.0-SNAPSHOT.
This is followed by follow-up questions for the dev community with
more intimate knowledge of the scheduler so that they may confirm my
guess at the cause, and please provide insight at how best to avoid
the problem.

Attached to this email are Gantt-chart plots which show the task
execution over elapsed time in a Spark program. This program was meant
to investigate the simplest possible vector operation for block-vector
data stored in RDDs of type RDD[(Int,Vector)]. In the Gantt plots,
you'll see the tasks shown as horizontal lines along the x axis, which
shows elapsed time. The shaded regions represent a single executor
such that all tasks managed by a single executor lie in a contiguous
shaded region. The executors all managed 16 cores on 4 different
compute nodes, and the tasks have been sorted and fit into 16 slots
for each executor according their chronological order, as determined
by the task information in the event log for the program, such that
the y-axis corresponds to essentially the unique core id, ranging from
1 to 64. The numbers running horizontally at the top of these plots is
the stage number, as determined by the DAG scheduler.

In the program itself, two block vectors, v_1 and v_2, were created
and copartitioned, cached, and then added together elementwise through
a join operation on their block index keys. Stages 0 and 1 correspond
to the map and count operations to create v_1; stages 2 and 3
correspond to the same operations on v_2; and stages 6 through 15
consist of identical count operations to materialize the vector v =
v_1 + v_2, formed through a join on v_1 and v_2. The vectors v_1 and
v_2 were initialized by first creating the keys using a
sc.parallelize{0 to num_blocks - 1} operation, after which the keys
were partitioned with a HashPartitioner (note that first a dummy map
{k => (k,k)} on the keys was done so that the HashPartitioner could be
used; the motivation for this was that, for large block vector RDDs,
it was be better to hash partition the keys before generating the
data). The size of the vectors is determined as a multiple of a fixed
vector block size (size of each sub-block) times the number of
partitions, which is itself an integer multiple of the number of
cores. Furthermore, each partition has \gamma blocks. So each
partition has \gamma blocks; there are \alpha partitions per core, and
each block has size 2^16.

The first plot, 02_4node_imbalance_spark1.6_run2.pdf, shows a
representative run of the block vector addition program for \alpha =
4, \gamma = 4. A well-balanced partitioning would correspond to 4
partitions for core, such that each executor is managing 64 tasks.
However, as you can see in stage 0, this does not occur: there is a
large imbalance, where cores 46--64 have many more tasks to compute
than the others.

Observing the order of the task assignment, I believe that what is
happening here is that, due to the initial random delay of the
executors in responding/receiving master instructions, the driver is
assigning more tasks to the executor whose initial wave of tasks
finishes first. Since there is *no* data locality in stage 0 to factor
into determining on which nodes the computation should occur, my
understanding is that the driver will allocate the tasks
greedily---hence the initial delay is crucial for allocating
partitions evenly across the nodes. Furthermore, note that stage 2 (an
identical vector initialization operation to stage 0) is
well-balanced, since all of the executors completed tasks at
approximately the same time, and hence without data locality being a
factor, were assigned new tasks at the same rate. Also, note here that
the individual task durations are *decreasing markedly* through stages
6--15 (again, all of which are identical), but that the stages are
longer than need be due to the load imbalance of the tasks.

The second plot, 02_4node_balance_longer.pdf, shows a second version
of this same program. The code is identical, however the commandline
input parameters have been changed such that there were 64 partitions
(\alpha = 1 partition per core), an identical blocksize of 2^16, but
\gamma = 16 blocks per partitions---i.e. fewer yet larger partitions
such that the vector is the same size. Here, stage 0 and 2 are both
evenly partitioned; since the tasks in these stages are longer than
the initial executor delay, no imbalance is created. However, despite
the better balance in partitions across the nodes, this program takes
*longer* in total elapsed time, and the tasks do not seem to be
getting shorter by the same proportion as in the previous test with
more partitions.

Given the above, I would like to ask the following questions:

1. Is my inference correct that the partition imbalance arises due to
the greedy nature of the scheduler when data locality is not a factor?

2a. Why are the task times in plot 1 decreasing so dramatically, but
not in plot 2?
2b. Could the decrease in time be due to just-in-time compilation?
2c. If so, Why would the JIT occur only for the first case with many
partitions when the same amount of computational work is to be done in
both cases?

3. If an RDD is to be created in such a manner (i.e. initialized for,
say, an iterative algorithm, rather than by reading data from disk or
hdfs), what is the best practice to promote good load balancing? My
first idea would be to create the full RDD with 2x as many partitions
but then coalesce it down to half the number of partitions with the
shuffle flag set to true. Would that be reasonable?

Thank you very much for your time, and I very much hope that someone
from the dev community who is familiar with the scheduler may be able
to clarify the above observations and questions.

Thanks,
Mike

P.S. Koert Kuipers: neither spark-defaults.sh setting impacted the
observed behaviour, but thank you kindly for your suggestions.


On 4/5/16, Khaled Ammar <kh...@gmail.com> wrote:
> I have a similar experience.
>
> Using 32 machines, I can see than number of tasks (partitions) assigned to
> executors (machines) is not even. Moreover, the distribution change every
> stage (iteration).
>
> I wonder why Spark needs to move partitions around any way, should not the
> scheduler reduce network (and other IO) overhead by reducing such
> relocation.
>
> Thanks,
> -Khaled
>
>
>
>
> On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> can you try:
>> spark.shuffle.reduceLocality.enabled=false
>>
>> On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91...@gmail.com> wrote:
>>
>>> Dear all,
>>>
>>> Thank you for your responses.
>>>
>>> Michael Slavitch:
>>> > Just to be sure:  Has spark-env.sh and spark-defaults.conf been
>>> correctly propagated to all nodes?  Are they identical?
>>> Yes; these files are stored on a shared memory directory accessible to
>>> all nodes.
>>>
>>> Koert Kuipers:
>>> > we ran into similar issues and it seems related to the new memory
>>> > management. can you try:
>>> > spark.memory.useLegacyMode = true
>>> I reran the exact same code with a restarted cluster using this
>>> modification, and did not observe any difference. The partitioning is
>>> still imbalanced.
>>>
>>> Ted Yu:
>>> > If the changes can be ported over to 1.6.1, do you mind reproducing
>>> > the
>>> issue there ?
>>> Since the spark.memory.useLegacyMode setting did not impact my code
>>> execution, I will have to change the Spark dependency back to earlier
>>> versions to see if the issue persists and get back to you.
>>>
>>> Meanwhile, if anyone else has any other ideas or experience, please let
>>> me know.
>>>
>>> Mike
>>>
>>> On 4/4/16, Koert Kuipers <ko...@tresata.com> wrote:
>>> > we ran into similar issues and it seems related to the new memory
>>> > management. can you try:
>>> > spark.memory.useLegacyMode = true
>>> >
>>> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91...@gmail.com> wrote:
>>> >
>>> >> [ CC'ing dev list since nearly identical questions have occurred in
>>> >> user list recently w/o resolution;
>>> >> c.f.:
>>> >>
>>> >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>>> >>
>>> >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>>> >> ]
>>> >>
>>> >> Hello,
>>> >>
>>> >> In short, I'm reporting a problem concerning load imbalance of RDD
>>> >> partitions across a standalone cluster. Though there are 16 cores
>>> >> available per node, certain nodes will have >16 partitions, and some
>>> >> will correspondingly have <16 (and even 0).
>>> >>
>>> >> In more detail: I am running some scalability/performance tests for
>>> >> vector-type operations. The RDDs I'm considering are simple block
>>> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>>> >> are generated with a fixed number of elements given by some multiple
>>> >> of the available cores, and subsequently hash-partitioned by their
>>> >> integer block index.
>>> >>
>>> >> I have verified that the hash partitioning key distribution, as well
>>> >> as the keys themselves, are both correct; the problem is truly that
>>> >> the partitions are *not* evenly distributed across the nodes.
>>> >>
>>> >> For instance, here is a representative output for some stages and
>>> >> tasks in an iterative program. This is a very simple test with 2
>>> >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
>>> >> examples stages from the stderr log are stages 7 and 9:
>>> >> 7,mapPartitions at
>>> >> DummyVector.scala:113,64,1459771364404,1459771365272
>>> >> 9,mapPartitions at
>>> >> DummyVector.scala:113,64,1459771364431,1459771365639
>>> >>
>>> >> When counting the location of the partitions on the compute nodes
>>> >> from
>>> >> the stderr logs, however, you can clearly see the imbalance. Examples
>>> >> lines are:
>>> >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
>>> >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
>>> >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
>>> >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
>>> >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
>>> >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>>> >>
>>> >> Grep'ing the full set of above lines for each hostname, himrod-?,
>>> >> shows the problem occurs in each stage. Below is the output, where
>>> >> the
>>> >> number of partitions stored on each node is given alongside its
>>> >> hostname as in (himrod-?,num_partitions):
>>> >> Stage 7: (himrod-1,0) (himrod-2,64)
>>> >> Stage 9: (himrod-1,16) (himrod-2,48)
>>> >> Stage 12: (himrod-1,0) (himrod-2,64)
>>> >> Stage 14: (himrod-1,16) (himrod-2,48)
>>> >> The imbalance is also visible when the executor ID is used to count
>>> >> the partitions operated on by executors.
>>> >>
>>> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT
>>> >> branch
>>> >> (but the modifications do not touch the scheduler, and are irrelevant
>>> >> for these particular tests). Has something changed radically in 1.6+
>>> >> that would make a previously (<=1.5) correct configuration go
>>> >> haywire?
>>> >> Have new configuration settings been added of which I'm unaware that
>>> >> could lead to this problem?
>>> >>
>>> >> Please let me know if others in the community have observed this, and
>>> >> thank you for your time,
>>> >> Mike
>>> >>
>>> >> ---------------------------------------------------------------------
>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>> >>
>>> >>
>>> >
>>>
>>>
>>> --
>>> Thanks,
>>> Mike
>>>
>>
>>
>
>
> --
> Thanks,
> -Khaled
>


-- 
Thanks,
Mike

Re: RDD Partitions not distributed evenly to executors

Posted by Mike Hynes <91...@gmail.com>.
Hello All (and Devs in particular),

Thank you again for your further responses. Please find a detailed
email below which identifies the cause (I believe) of the partition
imbalance problem, which occurs in spark 1.5, 1.6, and a 2.0-SNAPSHOT.
This is followed by follow-up questions for the dev community with
more intimate knowledge of the scheduler so that they may confirm my
guess at the cause, and please provide insight at how best to avoid
the problem.

Attached to this email are Gantt-chart plots which show the task
execution over elapsed time in a Spark program. This program was meant
to investigate the simplest possible vector operation for block-vector
data stored in RDDs of type RDD[(Int,Vector)]. In the Gantt plots,
you'll see the tasks shown as horizontal lines along the x axis, which
shows elapsed time. The shaded regions represent a single executor
such that all tasks managed by a single executor lie in a contiguous
shaded region. The executors all managed 16 cores on 4 different
compute nodes, and the tasks have been sorted and fit into 16 slots
for each executor according their chronological order, as determined
by the task information in the event log for the program, such that
the y-axis corresponds to essentially the unique core id, ranging from
1 to 64. The numbers running horizontally at the top of these plots is
the stage number, as determined by the DAG scheduler.

In the program itself, two block vectors, v_1 and v_2, were created
and copartitioned, cached, and then added together elementwise through
a join operation on their block index keys. Stages 0 and 1 correspond
to the map and count operations to create v_1; stages 2 and 3
correspond to the same operations on v_2; and stages 6 through 15
consist of identical count operations to materialize the vector v =
v_1 + v_2, formed through a join on v_1 and v_2. The vectors v_1 and
v_2 were initialized by first creating the keys using a
sc.parallelize{0 to num_blocks - 1} operation, after which the keys
were partitioned with a HashPartitioner (note that first a dummy map
{k => (k,k)} on the keys was done so that the HashPartitioner could be
used; the motivation for this was that, for large block vector RDDs,
it was be better to hash partition the keys before generating the
data). The size of the vectors is determined as a multiple of a fixed
vector block size (size of each sub-block) times the number of
partitions, which is itself an integer multiple of the number of
cores. Furthermore, each partition has \gamma blocks. So each
partition has \gamma blocks; there are \alpha partitions per core, and
each block has size 2^16.

The first plot, 02_4node_imbalance_spark1.6_run2.pdf, shows a
representative run of the block vector addition program for \alpha =
4, \gamma = 4. A well-balanced partitioning would correspond to 4
partitions for core, such that each executor is managing 64 tasks.
However, as you can see in stage 0, this does not occur: there is a
large imbalance, where cores 46--64 have many more tasks to compute
than the others.

Observing the order of the task assignment, I believe that what is
happening here is that, due to the initial random delay of the
executors in responding/receiving master instructions, the driver is
assigning more tasks to the executor whose initial wave of tasks
finishes first. Since there is *no* data locality in stage 0 to factor
into determining on which nodes the computation should occur, my
understanding is that the driver will allocate the tasks
greedily---hence the initial delay is crucial for allocating
partitions evenly across the nodes. Furthermore, note that stage 2 (an
identical vector initialization operation to stage 0) is
well-balanced, since all of the executors completed tasks at
approximately the same time, and hence without data locality being a
factor, were assigned new tasks at the same rate. Also, note here that
the individual task durations are *decreasing markedly* through stages
6--15 (again, all of which are identical), but that the stages are
longer than need be due to the load imbalance of the tasks.

The second plot, 02_4node_balance_longer.pdf, shows a second version
of this same program. The code is identical, however the commandline
input parameters have been changed such that there were 64 partitions
(\alpha = 1 partition per core), an identical blocksize of 2^16, but
\gamma = 16 blocks per partitions---i.e. fewer yet larger partitions
such that the vector is the same size. Here, stage 0 and 2 are both
evenly partitioned; since the tasks in these stages are longer than
the initial executor delay, no imbalance is created. However, despite
the better balance in partitions across the nodes, this program takes
*longer* in total elapsed time, and the tasks do not seem to be
getting shorter by the same proportion as in the previous test with
more partitions.

Given the above, I would like to ask the following questions:

1. Is my inference correct that the partition imbalance arises due to
the greedy nature of the scheduler when data locality is not a factor?

2a. Why are the task times in plot 1 decreasing so dramatically, but
not in plot 2?
2b. Could the decrease in time be due to just-in-time compilation?
2c. If so, Why would the JIT occur only for the first case with many
partitions when the same amount of computational work is to be done in
both cases?

3. If an RDD is to be created in such a manner (i.e. initialized for,
say, an iterative algorithm, rather than by reading data from disk or
hdfs), what is the best practice to promote good load balancing? My
first idea would be to create the full RDD with 2x as many partitions
but then coalesce it down to half the number of partitions with the
shuffle flag set to true. Would that be reasonable?

Thank you very much for your time, and I very much hope that someone
from the dev community who is familiar with the scheduler may be able
to clarify the above observations and questions.

Thanks,
Mike

P.S. Koert Kuipers: neither spark-defaults.sh setting impacted the
observed behaviour, but thank you kindly for your suggestions.


On 4/5/16, Khaled Ammar <kh...@gmail.com> wrote:
> I have a similar experience.
>
> Using 32 machines, I can see than number of tasks (partitions) assigned to
> executors (machines) is not even. Moreover, the distribution change every
> stage (iteration).
>
> I wonder why Spark needs to move partitions around any way, should not the
> scheduler reduce network (and other IO) overhead by reducing such
> relocation.
>
> Thanks,
> -Khaled
>
>
>
>
> On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> can you try:
>> spark.shuffle.reduceLocality.enabled=false
>>
>> On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91...@gmail.com> wrote:
>>
>>> Dear all,
>>>
>>> Thank you for your responses.
>>>
>>> Michael Slavitch:
>>> > Just to be sure:  Has spark-env.sh and spark-defaults.conf been
>>> correctly propagated to all nodes?  Are they identical?
>>> Yes; these files are stored on a shared memory directory accessible to
>>> all nodes.
>>>
>>> Koert Kuipers:
>>> > we ran into similar issues and it seems related to the new memory
>>> > management. can you try:
>>> > spark.memory.useLegacyMode = true
>>> I reran the exact same code with a restarted cluster using this
>>> modification, and did not observe any difference. The partitioning is
>>> still imbalanced.
>>>
>>> Ted Yu:
>>> > If the changes can be ported over to 1.6.1, do you mind reproducing
>>> > the
>>> issue there ?
>>> Since the spark.memory.useLegacyMode setting did not impact my code
>>> execution, I will have to change the Spark dependency back to earlier
>>> versions to see if the issue persists and get back to you.
>>>
>>> Meanwhile, if anyone else has any other ideas or experience, please let
>>> me know.
>>>
>>> Mike
>>>
>>> On 4/4/16, Koert Kuipers <ko...@tresata.com> wrote:
>>> > we ran into similar issues and it seems related to the new memory
>>> > management. can you try:
>>> > spark.memory.useLegacyMode = true
>>> >
>>> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91...@gmail.com> wrote:
>>> >
>>> >> [ CC'ing dev list since nearly identical questions have occurred in
>>> >> user list recently w/o resolution;
>>> >> c.f.:
>>> >>
>>> >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>>> >>
>>> >>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>>> >> ]
>>> >>
>>> >> Hello,
>>> >>
>>> >> In short, I'm reporting a problem concerning load imbalance of RDD
>>> >> partitions across a standalone cluster. Though there are 16 cores
>>> >> available per node, certain nodes will have >16 partitions, and some
>>> >> will correspondingly have <16 (and even 0).
>>> >>
>>> >> In more detail: I am running some scalability/performance tests for
>>> >> vector-type operations. The RDDs I'm considering are simple block
>>> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>>> >> are generated with a fixed number of elements given by some multiple
>>> >> of the available cores, and subsequently hash-partitioned by their
>>> >> integer block index.
>>> >>
>>> >> I have verified that the hash partitioning key distribution, as well
>>> >> as the keys themselves, are both correct; the problem is truly that
>>> >> the partitions are *not* evenly distributed across the nodes.
>>> >>
>>> >> For instance, here is a representative output for some stages and
>>> >> tasks in an iterative program. This is a very simple test with 2
>>> >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
>>> >> examples stages from the stderr log are stages 7 and 9:
>>> >> 7,mapPartitions at
>>> >> DummyVector.scala:113,64,1459771364404,1459771365272
>>> >> 9,mapPartitions at
>>> >> DummyVector.scala:113,64,1459771364431,1459771365639
>>> >>
>>> >> When counting the location of the partitions on the compute nodes
>>> >> from
>>> >> the stderr logs, however, you can clearly see the imbalance. Examples
>>> >> lines are:
>>> >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
>>> >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
>>> >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
>>> >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
>>> >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
>>> >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>>> >>
>>> >> Grep'ing the full set of above lines for each hostname, himrod-?,
>>> >> shows the problem occurs in each stage. Below is the output, where
>>> >> the
>>> >> number of partitions stored on each node is given alongside its
>>> >> hostname as in (himrod-?,num_partitions):
>>> >> Stage 7: (himrod-1,0) (himrod-2,64)
>>> >> Stage 9: (himrod-1,16) (himrod-2,48)
>>> >> Stage 12: (himrod-1,0) (himrod-2,64)
>>> >> Stage 14: (himrod-1,16) (himrod-2,48)
>>> >> The imbalance is also visible when the executor ID is used to count
>>> >> the partitions operated on by executors.
>>> >>
>>> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT
>>> >> branch
>>> >> (but the modifications do not touch the scheduler, and are irrelevant
>>> >> for these particular tests). Has something changed radically in 1.6+
>>> >> that would make a previously (<=1.5) correct configuration go
>>> >> haywire?
>>> >> Have new configuration settings been added of which I'm unaware that
>>> >> could lead to this problem?
>>> >>
>>> >> Please let me know if others in the community have observed this, and
>>> >> thank you for your time,
>>> >> Mike
>>> >>
>>> >> ---------------------------------------------------------------------
>>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >> For additional commands, e-mail: user-help@spark.apache.org
>>> >>
>>> >>
>>> >
>>>
>>>
>>> --
>>> Thanks,
>>> Mike
>>>
>>
>>
>
>
> --
> Thanks,
> -Khaled
>


-- 
Thanks,
Mike

Re: RDD Partitions not distributed evenly to executors

Posted by Khaled Ammar <kh...@gmail.com>.
I have a similar experience.

Using 32 machines, I can see than number of tasks (partitions) assigned to
executors (machines) is not even. Moreover, the distribution change every
stage (iteration).

I wonder why Spark needs to move partitions around any way, should not the
scheduler reduce network (and other IO) overhead by reducing such
relocation.

Thanks,
-Khaled




On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers <ko...@tresata.com> wrote:

> can you try:
> spark.shuffle.reduceLocality.enabled=false
>
> On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91...@gmail.com> wrote:
>
>> Dear all,
>>
>> Thank you for your responses.
>>
>> Michael Slavitch:
>> > Just to be sure:  Has spark-env.sh and spark-defaults.conf been
>> correctly propagated to all nodes?  Are they identical?
>> Yes; these files are stored on a shared memory directory accessible to
>> all nodes.
>>
>> Koert Kuipers:
>> > we ran into similar issues and it seems related to the new memory
>> > management. can you try:
>> > spark.memory.useLegacyMode = true
>> I reran the exact same code with a restarted cluster using this
>> modification, and did not observe any difference. The partitioning is
>> still imbalanced.
>>
>> Ted Yu:
>> > If the changes can be ported over to 1.6.1, do you mind reproducing the
>> issue there ?
>> Since the spark.memory.useLegacyMode setting did not impact my code
>> execution, I will have to change the Spark dependency back to earlier
>> versions to see if the issue persists and get back to you.
>>
>> Meanwhile, if anyone else has any other ideas or experience, please let
>> me know.
>>
>> Mike
>>
>> On 4/4/16, Koert Kuipers <ko...@tresata.com> wrote:
>> > we ran into similar issues and it seems related to the new memory
>> > management. can you try:
>> > spark.memory.useLegacyMode = true
>> >
>> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91...@gmail.com> wrote:
>> >
>> >> [ CC'ing dev list since nearly identical questions have occurred in
>> >> user list recently w/o resolution;
>> >> c.f.:
>> >>
>> >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>> >>
>> >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>> >> ]
>> >>
>> >> Hello,
>> >>
>> >> In short, I'm reporting a problem concerning load imbalance of RDD
>> >> partitions across a standalone cluster. Though there are 16 cores
>> >> available per node, certain nodes will have >16 partitions, and some
>> >> will correspondingly have <16 (and even 0).
>> >>
>> >> In more detail: I am running some scalability/performance tests for
>> >> vector-type operations. The RDDs I'm considering are simple block
>> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>> >> are generated with a fixed number of elements given by some multiple
>> >> of the available cores, and subsequently hash-partitioned by their
>> >> integer block index.
>> >>
>> >> I have verified that the hash partitioning key distribution, as well
>> >> as the keys themselves, are both correct; the problem is truly that
>> >> the partitions are *not* evenly distributed across the nodes.
>> >>
>> >> For instance, here is a representative output for some stages and
>> >> tasks in an iterative program. This is a very simple test with 2
>> >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
>> >> examples stages from the stderr log are stages 7 and 9:
>> >> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
>> >> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
>> >>
>> >> When counting the location of the partitions on the compute nodes from
>> >> the stderr logs, however, you can clearly see the imbalance. Examples
>> >> lines are:
>> >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
>> >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
>> >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
>> >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
>> >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
>> >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>> >>
>> >> Grep'ing the full set of above lines for each hostname, himrod-?,
>> >> shows the problem occurs in each stage. Below is the output, where the
>> >> number of partitions stored on each node is given alongside its
>> >> hostname as in (himrod-?,num_partitions):
>> >> Stage 7: (himrod-1,0) (himrod-2,64)
>> >> Stage 9: (himrod-1,16) (himrod-2,48)
>> >> Stage 12: (himrod-1,0) (himrod-2,64)
>> >> Stage 14: (himrod-1,16) (himrod-2,48)
>> >> The imbalance is also visible when the executor ID is used to count
>> >> the partitions operated on by executors.
>> >>
>> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
>> >> (but the modifications do not touch the scheduler, and are irrelevant
>> >> for these particular tests). Has something changed radically in 1.6+
>> >> that would make a previously (<=1.5) correct configuration go haywire?
>> >> Have new configuration settings been added of which I'm unaware that
>> >> could lead to this problem?
>> >>
>> >> Please let me know if others in the community have observed this, and
>> >> thank you for your time,
>> >> Mike
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> For additional commands, e-mail: user-help@spark.apache.org
>> >>
>> >>
>> >
>>
>>
>> --
>> Thanks,
>> Mike
>>
>
>


-- 
Thanks,
-Khaled

Re: RDD Partitions not distributed evenly to executors

Posted by Khaled Ammar <kh...@gmail.com>.
I have a similar experience.

Using 32 machines, I can see than number of tasks (partitions) assigned to
executors (machines) is not even. Moreover, the distribution change every
stage (iteration).

I wonder why Spark needs to move partitions around any way, should not the
scheduler reduce network (and other IO) overhead by reducing such
relocation.

Thanks,
-Khaled




On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers <ko...@tresata.com> wrote:

> can you try:
> spark.shuffle.reduceLocality.enabled=false
>
> On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91...@gmail.com> wrote:
>
>> Dear all,
>>
>> Thank you for your responses.
>>
>> Michael Slavitch:
>> > Just to be sure:  Has spark-env.sh and spark-defaults.conf been
>> correctly propagated to all nodes?  Are they identical?
>> Yes; these files are stored on a shared memory directory accessible to
>> all nodes.
>>
>> Koert Kuipers:
>> > we ran into similar issues and it seems related to the new memory
>> > management. can you try:
>> > spark.memory.useLegacyMode = true
>> I reran the exact same code with a restarted cluster using this
>> modification, and did not observe any difference. The partitioning is
>> still imbalanced.
>>
>> Ted Yu:
>> > If the changes can be ported over to 1.6.1, do you mind reproducing the
>> issue there ?
>> Since the spark.memory.useLegacyMode setting did not impact my code
>> execution, I will have to change the Spark dependency back to earlier
>> versions to see if the issue persists and get back to you.
>>
>> Meanwhile, if anyone else has any other ideas or experience, please let
>> me know.
>>
>> Mike
>>
>> On 4/4/16, Koert Kuipers <ko...@tresata.com> wrote:
>> > we ran into similar issues and it seems related to the new memory
>> > management. can you try:
>> > spark.memory.useLegacyMode = true
>> >
>> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91...@gmail.com> wrote:
>> >
>> >> [ CC'ing dev list since nearly identical questions have occurred in
>> >> user list recently w/o resolution;
>> >> c.f.:
>> >>
>> >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>> >>
>> >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>> >> ]
>> >>
>> >> Hello,
>> >>
>> >> In short, I'm reporting a problem concerning load imbalance of RDD
>> >> partitions across a standalone cluster. Though there are 16 cores
>> >> available per node, certain nodes will have >16 partitions, and some
>> >> will correspondingly have <16 (and even 0).
>> >>
>> >> In more detail: I am running some scalability/performance tests for
>> >> vector-type operations. The RDDs I'm considering are simple block
>> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>> >> are generated with a fixed number of elements given by some multiple
>> >> of the available cores, and subsequently hash-partitioned by their
>> >> integer block index.
>> >>
>> >> I have verified that the hash partitioning key distribution, as well
>> >> as the keys themselves, are both correct; the problem is truly that
>> >> the partitions are *not* evenly distributed across the nodes.
>> >>
>> >> For instance, here is a representative output for some stages and
>> >> tasks in an iterative program. This is a very simple test with 2
>> >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
>> >> examples stages from the stderr log are stages 7 and 9:
>> >> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
>> >> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
>> >>
>> >> When counting the location of the partitions on the compute nodes from
>> >> the stderr logs, however, you can clearly see the imbalance. Examples
>> >> lines are:
>> >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
>> >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
>> >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
>> >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
>> >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
>> >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>> >>
>> >> Grep'ing the full set of above lines for each hostname, himrod-?,
>> >> shows the problem occurs in each stage. Below is the output, where the
>> >> number of partitions stored on each node is given alongside its
>> >> hostname as in (himrod-?,num_partitions):
>> >> Stage 7: (himrod-1,0) (himrod-2,64)
>> >> Stage 9: (himrod-1,16) (himrod-2,48)
>> >> Stage 12: (himrod-1,0) (himrod-2,64)
>> >> Stage 14: (himrod-1,16) (himrod-2,48)
>> >> The imbalance is also visible when the executor ID is used to count
>> >> the partitions operated on by executors.
>> >>
>> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
>> >> (but the modifications do not touch the scheduler, and are irrelevant
>> >> for these particular tests). Has something changed radically in 1.6+
>> >> that would make a previously (<=1.5) correct configuration go haywire?
>> >> Have new configuration settings been added of which I'm unaware that
>> >> could lead to this problem?
>> >>
>> >> Please let me know if others in the community have observed this, and
>> >> thank you for your time,
>> >> Mike
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> For additional commands, e-mail: user-help@spark.apache.org
>> >>
>> >>
>> >
>>
>>
>> --
>> Thanks,
>> Mike
>>
>
>


-- 
Thanks,
-Khaled

Re: RDD Partitions not distributed evenly to executors

Posted by Koert Kuipers <ko...@tresata.com>.
can you try:
spark.shuffle.reduceLocality.enabled=false

On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91...@gmail.com> wrote:

> Dear all,
>
> Thank you for your responses.
>
> Michael Slavitch:
> > Just to be sure:  Has spark-env.sh and spark-defaults.conf been
> correctly propagated to all nodes?  Are they identical?
> Yes; these files are stored on a shared memory directory accessible to
> all nodes.
>
> Koert Kuipers:
> > we ran into similar issues and it seems related to the new memory
> > management. can you try:
> > spark.memory.useLegacyMode = true
> I reran the exact same code with a restarted cluster using this
> modification, and did not observe any difference. The partitioning is
> still imbalanced.
>
> Ted Yu:
> > If the changes can be ported over to 1.6.1, do you mind reproducing the
> issue there ?
> Since the spark.memory.useLegacyMode setting did not impact my code
> execution, I will have to change the Spark dependency back to earlier
> versions to see if the issue persists and get back to you.
>
> Meanwhile, if anyone else has any other ideas or experience, please let me
> know.
>
> Mike
>
> On 4/4/16, Koert Kuipers <ko...@tresata.com> wrote:
> > we ran into similar issues and it seems related to the new memory
> > management. can you try:
> > spark.memory.useLegacyMode = true
> >
> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91...@gmail.com> wrote:
> >
> >> [ CC'ing dev list since nearly identical questions have occurred in
> >> user list recently w/o resolution;
> >> c.f.:
> >>
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
> >>
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
> >> ]
> >>
> >> Hello,
> >>
> >> In short, I'm reporting a problem concerning load imbalance of RDD
> >> partitions across a standalone cluster. Though there are 16 cores
> >> available per node, certain nodes will have >16 partitions, and some
> >> will correspondingly have <16 (and even 0).
> >>
> >> In more detail: I am running some scalability/performance tests for
> >> vector-type operations. The RDDs I'm considering are simple block
> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
> >> are generated with a fixed number of elements given by some multiple
> >> of the available cores, and subsequently hash-partitioned by their
> >> integer block index.
> >>
> >> I have verified that the hash partitioning key distribution, as well
> >> as the keys themselves, are both correct; the problem is truly that
> >> the partitions are *not* evenly distributed across the nodes.
> >>
> >> For instance, here is a representative output for some stages and
> >> tasks in an iterative program. This is a very simple test with 2
> >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
> >> examples stages from the stderr log are stages 7 and 9:
> >> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
> >> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
> >>
> >> When counting the location of the partitions on the compute nodes from
> >> the stderr logs, however, you can clearly see the imbalance. Examples
> >> lines are:
> >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
> >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
> >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
> >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
> >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
> >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
> >>
> >> Grep'ing the full set of above lines for each hostname, himrod-?,
> >> shows the problem occurs in each stage. Below is the output, where the
> >> number of partitions stored on each node is given alongside its
> >> hostname as in (himrod-?,num_partitions):
> >> Stage 7: (himrod-1,0) (himrod-2,64)
> >> Stage 9: (himrod-1,16) (himrod-2,48)
> >> Stage 12: (himrod-1,0) (himrod-2,64)
> >> Stage 14: (himrod-1,16) (himrod-2,48)
> >> The imbalance is also visible when the executor ID is used to count
> >> the partitions operated on by executors.
> >>
> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
> >> (but the modifications do not touch the scheduler, and are irrelevant
> >> for these particular tests). Has something changed radically in 1.6+
> >> that would make a previously (<=1.5) correct configuration go haywire?
> >> Have new configuration settings been added of which I'm unaware that
> >> could lead to this problem?
> >>
> >> Please let me know if others in the community have observed this, and
> >> thank you for your time,
> >> Mike
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> For additional commands, e-mail: user-help@spark.apache.org
> >>
> >>
> >
>
>
> --
> Thanks,
> Mike
>

Re: RDD Partitions not distributed evenly to executors

Posted by Koert Kuipers <ko...@tresata.com>.
can you try:
spark.shuffle.reduceLocality.enabled=false

On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91...@gmail.com> wrote:

> Dear all,
>
> Thank you for your responses.
>
> Michael Slavitch:
> > Just to be sure:  Has spark-env.sh and spark-defaults.conf been
> correctly propagated to all nodes?  Are they identical?
> Yes; these files are stored on a shared memory directory accessible to
> all nodes.
>
> Koert Kuipers:
> > we ran into similar issues and it seems related to the new memory
> > management. can you try:
> > spark.memory.useLegacyMode = true
> I reran the exact same code with a restarted cluster using this
> modification, and did not observe any difference. The partitioning is
> still imbalanced.
>
> Ted Yu:
> > If the changes can be ported over to 1.6.1, do you mind reproducing the
> issue there ?
> Since the spark.memory.useLegacyMode setting did not impact my code
> execution, I will have to change the Spark dependency back to earlier
> versions to see if the issue persists and get back to you.
>
> Meanwhile, if anyone else has any other ideas or experience, please let me
> know.
>
> Mike
>
> On 4/4/16, Koert Kuipers <ko...@tresata.com> wrote:
> > we ran into similar issues and it seems related to the new memory
> > management. can you try:
> > spark.memory.useLegacyMode = true
> >
> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91...@gmail.com> wrote:
> >
> >> [ CC'ing dev list since nearly identical questions have occurred in
> >> user list recently w/o resolution;
> >> c.f.:
> >>
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
> >>
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
> >> ]
> >>
> >> Hello,
> >>
> >> In short, I'm reporting a problem concerning load imbalance of RDD
> >> partitions across a standalone cluster. Though there are 16 cores
> >> available per node, certain nodes will have >16 partitions, and some
> >> will correspondingly have <16 (and even 0).
> >>
> >> In more detail: I am running some scalability/performance tests for
> >> vector-type operations. The RDDs I'm considering are simple block
> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
> >> are generated with a fixed number of elements given by some multiple
> >> of the available cores, and subsequently hash-partitioned by their
> >> integer block index.
> >>
> >> I have verified that the hash partitioning key distribution, as well
> >> as the keys themselves, are both correct; the problem is truly that
> >> the partitions are *not* evenly distributed across the nodes.
> >>
> >> For instance, here is a representative output for some stages and
> >> tasks in an iterative program. This is a very simple test with 2
> >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
> >> examples stages from the stderr log are stages 7 and 9:
> >> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
> >> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
> >>
> >> When counting the location of the partitions on the compute nodes from
> >> the stderr logs, however, you can clearly see the imbalance. Examples
> >> lines are:
> >> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
> >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
> >> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
> >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
> >> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
> >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
> >>
> >> Grep'ing the full set of above lines for each hostname, himrod-?,
> >> shows the problem occurs in each stage. Below is the output, where the
> >> number of partitions stored on each node is given alongside its
> >> hostname as in (himrod-?,num_partitions):
> >> Stage 7: (himrod-1,0) (himrod-2,64)
> >> Stage 9: (himrod-1,16) (himrod-2,48)
> >> Stage 12: (himrod-1,0) (himrod-2,64)
> >> Stage 14: (himrod-1,16) (himrod-2,48)
> >> The imbalance is also visible when the executor ID is used to count
> >> the partitions operated on by executors.
> >>
> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
> >> (but the modifications do not touch the scheduler, and are irrelevant
> >> for these particular tests). Has something changed radically in 1.6+
> >> that would make a previously (<=1.5) correct configuration go haywire?
> >> Have new configuration settings been added of which I'm unaware that
> >> could lead to this problem?
> >>
> >> Please let me know if others in the community have observed this, and
> >> thank you for your time,
> >> Mike
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> For additional commands, e-mail: user-help@spark.apache.org
> >>
> >>
> >
>
>
> --
> Thanks,
> Mike
>

Re: RDD Partitions not distributed evenly to executors

Posted by Mike Hynes <91...@gmail.com>.
Dear all,

Thank you for your responses.

Michael Slavitch:
> Just to be sure:  Has spark-env.sh and spark-defaults.conf been correctly propagated to all nodes?  Are they identical?
Yes; these files are stored on a shared memory directory accessible to
all nodes.

Koert Kuipers:
> we ran into similar issues and it seems related to the new memory
> management. can you try:
> spark.memory.useLegacyMode = true
I reran the exact same code with a restarted cluster using this
modification, and did not observe any difference. The partitioning is
still imbalanced.

Ted Yu:
> If the changes can be ported over to 1.6.1, do you mind reproducing the issue there ?
Since the spark.memory.useLegacyMode setting did not impact my code
execution, I will have to change the Spark dependency back to earlier
versions to see if the issue persists and get back to you.

Meanwhile, if anyone else has any other ideas or experience, please let me know.

Mike

On 4/4/16, Koert Kuipers <ko...@tresata.com> wrote:
> we ran into similar issues and it seems related to the new memory
> management. can you try:
> spark.memory.useLegacyMode = true
>
> On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91...@gmail.com> wrote:
>
>> [ CC'ing dev list since nearly identical questions have occurred in
>> user list recently w/o resolution;
>> c.f.:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>> ]
>>
>> Hello,
>>
>> In short, I'm reporting a problem concerning load imbalance of RDD
>> partitions across a standalone cluster. Though there are 16 cores
>> available per node, certain nodes will have >16 partitions, and some
>> will correspondingly have <16 (and even 0).
>>
>> In more detail: I am running some scalability/performance tests for
>> vector-type operations. The RDDs I'm considering are simple block
>> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>> are generated with a fixed number of elements given by some multiple
>> of the available cores, and subsequently hash-partitioned by their
>> integer block index.
>>
>> I have verified that the hash partitioning key distribution, as well
>> as the keys themselves, are both correct; the problem is truly that
>> the partitions are *not* evenly distributed across the nodes.
>>
>> For instance, here is a representative output for some stages and
>> tasks in an iterative program. This is a very simple test with 2
>> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
>> examples stages from the stderr log are stages 7 and 9:
>> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
>> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
>>
>> When counting the location of the partitions on the compute nodes from
>> the stderr logs, however, you can clearly see the imbalance. Examples
>> lines are:
>> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
>> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
>> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
>> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
>> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
>> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>>
>> Grep'ing the full set of above lines for each hostname, himrod-?,
>> shows the problem occurs in each stage. Below is the output, where the
>> number of partitions stored on each node is given alongside its
>> hostname as in (himrod-?,num_partitions):
>> Stage 7: (himrod-1,0) (himrod-2,64)
>> Stage 9: (himrod-1,16) (himrod-2,48)
>> Stage 12: (himrod-1,0) (himrod-2,64)
>> Stage 14: (himrod-1,16) (himrod-2,48)
>> The imbalance is also visible when the executor ID is used to count
>> the partitions operated on by executors.
>>
>> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
>> (but the modifications do not touch the scheduler, and are irrelevant
>> for these particular tests). Has something changed radically in 1.6+
>> that would make a previously (<=1.5) correct configuration go haywire?
>> Have new configuration settings been added of which I'm unaware that
>> could lead to this problem?
>>
>> Please let me know if others in the community have observed this, and
>> thank you for your time,
>> Mike
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>


-- 
Thanks,
Mike

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: RDD Partitions not distributed evenly to executors

Posted by Mike Hynes <91...@gmail.com>.
Dear all,

Thank you for your responses.

Michael Slavitch:
> Just to be sure:  Has spark-env.sh and spark-defaults.conf been correctly propagated to all nodes?  Are they identical?
Yes; these files are stored on a shared memory directory accessible to
all nodes.

Koert Kuipers:
> we ran into similar issues and it seems related to the new memory
> management. can you try:
> spark.memory.useLegacyMode = true
I reran the exact same code with a restarted cluster using this
modification, and did not observe any difference. The partitioning is
still imbalanced.

Ted Yu:
> If the changes can be ported over to 1.6.1, do you mind reproducing the issue there ?
Since the spark.memory.useLegacyMode setting did not impact my code
execution, I will have to change the Spark dependency back to earlier
versions to see if the issue persists and get back to you.

Meanwhile, if anyone else has any other ideas or experience, please let me know.

Mike

On 4/4/16, Koert Kuipers <ko...@tresata.com> wrote:
> we ran into similar issues and it seems related to the new memory
> management. can you try:
> spark.memory.useLegacyMode = true
>
> On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91...@gmail.com> wrote:
>
>> [ CC'ing dev list since nearly identical questions have occurred in
>> user list recently w/o resolution;
>> c.f.:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>> ]
>>
>> Hello,
>>
>> In short, I'm reporting a problem concerning load imbalance of RDD
>> partitions across a standalone cluster. Though there are 16 cores
>> available per node, certain nodes will have >16 partitions, and some
>> will correspondingly have <16 (and even 0).
>>
>> In more detail: I am running some scalability/performance tests for
>> vector-type operations. The RDDs I'm considering are simple block
>> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>> are generated with a fixed number of elements given by some multiple
>> of the available cores, and subsequently hash-partitioned by their
>> integer block index.
>>
>> I have verified that the hash partitioning key distribution, as well
>> as the keys themselves, are both correct; the problem is truly that
>> the partitions are *not* evenly distributed across the nodes.
>>
>> For instance, here is a representative output for some stages and
>> tasks in an iterative program. This is a very simple test with 2
>> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
>> examples stages from the stderr log are stages 7 and 9:
>> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
>> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
>>
>> When counting the location of the partitions on the compute nodes from
>> the stderr logs, however, you can clearly see the imbalance. Examples
>> lines are:
>> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
>> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
>> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
>> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
>> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
>> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>>
>> Grep'ing the full set of above lines for each hostname, himrod-?,
>> shows the problem occurs in each stage. Below is the output, where the
>> number of partitions stored on each node is given alongside its
>> hostname as in (himrod-?,num_partitions):
>> Stage 7: (himrod-1,0) (himrod-2,64)
>> Stage 9: (himrod-1,16) (himrod-2,48)
>> Stage 12: (himrod-1,0) (himrod-2,64)
>> Stage 14: (himrod-1,16) (himrod-2,48)
>> The imbalance is also visible when the executor ID is used to count
>> the partitions operated on by executors.
>>
>> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
>> (but the modifications do not touch the scheduler, and are irrelevant
>> for these particular tests). Has something changed radically in 1.6+
>> that would make a previously (<=1.5) correct configuration go haywire?
>> Have new configuration settings been added of which I'm unaware that
>> could lead to this problem?
>>
>> Please let me know if others in the community have observed this, and
>> thank you for your time,
>> Mike
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>


-- 
Thanks,
Mike

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: RDD Partitions not distributed evenly to executors

Posted by Koert Kuipers <ko...@tresata.com>.
we ran into similar issues and it seems related to the new memory
management. can you try:
spark.memory.useLegacyMode = true

On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91...@gmail.com> wrote:

> [ CC'ing dev list since nearly identical questions have occurred in
> user list recently w/o resolution;
> c.f.:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
> ]
>
> Hello,
>
> In short, I'm reporting a problem concerning load imbalance of RDD
> partitions across a standalone cluster. Though there are 16 cores
> available per node, certain nodes will have >16 partitions, and some
> will correspondingly have <16 (and even 0).
>
> In more detail: I am running some scalability/performance tests for
> vector-type operations. The RDDs I'm considering are simple block
> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
> are generated with a fixed number of elements given by some multiple
> of the available cores, and subsequently hash-partitioned by their
> integer block index.
>
> I have verified that the hash partitioning key distribution, as well
> as the keys themselves, are both correct; the problem is truly that
> the partitions are *not* evenly distributed across the nodes.
>
> For instance, here is a representative output for some stages and
> tasks in an iterative program. This is a very simple test with 2
> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
> examples stages from the stderr log are stages 7 and 9:
> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
>
> When counting the location of the partitions on the compute nodes from
> the stderr logs, however, you can clearly see the imbalance. Examples
> lines are:
> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>
> Grep'ing the full set of above lines for each hostname, himrod-?,
> shows the problem occurs in each stage. Below is the output, where the
> number of partitions stored on each node is given alongside its
> hostname as in (himrod-?,num_partitions):
> Stage 7: (himrod-1,0) (himrod-2,64)
> Stage 9: (himrod-1,16) (himrod-2,48)
> Stage 12: (himrod-1,0) (himrod-2,64)
> Stage 14: (himrod-1,16) (himrod-2,48)
> The imbalance is also visible when the executor ID is used to count
> the partitions operated on by executors.
>
> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
> (but the modifications do not touch the scheduler, and are irrelevant
> for these particular tests). Has something changed radically in 1.6+
> that would make a previously (<=1.5) correct configuration go haywire?
> Have new configuration settings been added of which I'm unaware that
> could lead to this problem?
>
> Please let me know if others in the community have observed this, and
> thank you for your time,
> Mike
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: RDD Partitions not distributed evenly to executors

Posted by Ted Yu <yu...@gmail.com>.
bq. the modifications do not touch the scheduler

If the changes can be ported over to 1.6.1, do you mind reproducing the
issue there ?

I ask because master branch changes very fast. It would be good to narrow
the scope where the behavior you observed started showing.

On Mon, Apr 4, 2016 at 6:12 AM, Mike Hynes <91...@gmail.com> wrote:

> [ CC'ing dev list since nearly identical questions have occurred in
> user list recently w/o resolution;
> c.f.:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
> ]
>
> Hello,
>
> In short, I'm reporting a problem concerning load imbalance of RDD
> partitions across a standalone cluster. Though there are 16 cores
> available per node, certain nodes will have >16 partitions, and some
> will correspondingly have <16 (and even 0).
>
> In more detail: I am running some scalability/performance tests for
> vector-type operations. The RDDs I'm considering are simple block
> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
> are generated with a fixed number of elements given by some multiple
> of the available cores, and subsequently hash-partitioned by their
> integer block index.
>
> I have verified that the hash partitioning key distribution, as well
> as the keys themselves, are both correct; the problem is truly that
> the partitions are *not* evenly distributed across the nodes.
>
> For instance, here is a representative output for some stages and
> tasks in an iterative program. This is a very simple test with 2
> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
> examples stages from the stderr log are stages 7 and 9:
> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
>
> When counting the location of the partitions on the compute nodes from
> the stderr logs, however, you can clearly see the imbalance. Examples
> lines are:
> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>
> Grep'ing the full set of above lines for each hostname, himrod-?,
> shows the problem occurs in each stage. Below is the output, where the
> number of partitions stored on each node is given alongside its
> hostname as in (himrod-?,num_partitions):
> Stage 7: (himrod-1,0) (himrod-2,64)
> Stage 9: (himrod-1,16) (himrod-2,48)
> Stage 12: (himrod-1,0) (himrod-2,64)
> Stage 14: (himrod-1,16) (himrod-2,48)
> The imbalance is also visible when the executor ID is used to count
> the partitions operated on by executors.
>
> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
> (but the modifications do not touch the scheduler, and are irrelevant
> for these particular tests). Has something changed radically in 1.6+
> that would make a previously (<=1.5) correct configuration go haywire?
> Have new configuration settings been added of which I'm unaware that
> could lead to this problem?
>
> Please let me know if others in the community have observed this, and
> thank you for your time,
> Mike
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>

Re: RDD Partitions not distributed evenly to executors

Posted by Ted Yu <yu...@gmail.com>.
bq. the modifications do not touch the scheduler

If the changes can be ported over to 1.6.1, do you mind reproducing the
issue there ?

I ask because master branch changes very fast. It would be good to narrow
the scope where the behavior you observed started showing.

On Mon, Apr 4, 2016 at 6:12 AM, Mike Hynes <91...@gmail.com> wrote:

> [ CC'ing dev list since nearly identical questions have occurred in
> user list recently w/o resolution;
> c.f.:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
> ]
>
> Hello,
>
> In short, I'm reporting a problem concerning load imbalance of RDD
> partitions across a standalone cluster. Though there are 16 cores
> available per node, certain nodes will have >16 partitions, and some
> will correspondingly have <16 (and even 0).
>
> In more detail: I am running some scalability/performance tests for
> vector-type operations. The RDDs I'm considering are simple block
> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
> are generated with a fixed number of elements given by some multiple
> of the available cores, and subsequently hash-partitioned by their
> integer block index.
>
> I have verified that the hash partitioning key distribution, as well
> as the keys themselves, are both correct; the problem is truly that
> the partitions are *not* evenly distributed across the nodes.
>
> For instance, here is a representative output for some stages and
> tasks in an iterative program. This is a very simple test with 2
> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
> examples stages from the stderr log are stages 7 and 9:
> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
>
> When counting the location of the partitions on the compute nodes from
> the stderr logs, however, you can clearly see the imbalance. Examples
> lines are:
> 13627&INFO&TaskSetManager&Starting task 0.0 in stage 7.0 (TID 196,
> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
> 13628&INFO&TaskSetManager&Starting task 1.0 in stage 7.0 (TID 197,
> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
> 13629&INFO&TaskSetManager&Starting task 2.0 in stage 7.0 (TID 198,
> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>
> Grep'ing the full set of above lines for each hostname, himrod-?,
> shows the problem occurs in each stage. Below is the output, where the
> number of partitions stored on each node is given alongside its
> hostname as in (himrod-?,num_partitions):
> Stage 7: (himrod-1,0) (himrod-2,64)
> Stage 9: (himrod-1,16) (himrod-2,48)
> Stage 12: (himrod-1,0) (himrod-2,64)
> Stage 14: (himrod-1,16) (himrod-2,48)
> The imbalance is also visible when the executor ID is used to count
> the partitions operated on by executors.
>
> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
> (but the modifications do not touch the scheduler, and are irrelevant
> for these particular tests). Has something changed radically in 1.6+
> that would make a previously (<=1.5) correct configuration go haywire?
> Have new configuration settings been added of which I'm unaware that
> could lead to this problem?
>
> Please let me know if others in the community have observed this, and
> thank you for your time,
> Mike
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>