You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Mike Hynes <91...@gmail.com> on 2016/04/23 01:40:30 UTC

Re: executor delay in Spark

Glad to hear that the problem was solvable! I have not seen delays of this
type for later stages in jobs run by spark-submit, but I do not think it
impossible if your stage has no lineage dependence on other RDDs.

I'm CC'ing the dev list to report of other users observing load imbalance
caused by unusual initial task scheduling. I don't know of ways to avoid
this other than creating a dummy task to synchronize the executors, but
hopefully someone from there can suggest other possibilities.

Mike
On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m....@gmail.com>
wrote:

> Mike,
>
> It turns out the executor delay, as you mentioned, is the cause. After we
> introduced a dummy stage, partitioning was working fine. Does this delay
> happen during later stages as well? We noticed the same behavior
> (partitioning happens on spark-shell but not through spark-submit) at a
> later stage also.
>
> Apart from introducing a dummy stage or running it from spark-shell, is
> there any other option to fix this?
>
> Regards,
> Raghava.
>
>
> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91...@gmail.com> wrote:
>
>> When submitting a job with spark-submit, I've observed delays (up to
>> 1--2 seconds) for the executors to respond to the driver in order to
>> receive tasks in the first stage. The delay does not persist once the
>> executors have been synchronized.
>>
>> When the tasks are very short, as may be your case (relatively small
>> data and a simple map task like you have described), the 8 tasks in
>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>> the second executor won't have responded to the master before the
>> first 4 tasks on the first executor have completed.
>>
>> To see if this is the cause in your particular case, you could try the
>> following to confirm:
>>         1. Examine the starting times of the tasks alongside their
>> executor
>>         2. Make a "dummy" stage execute before your real stages to
>> synchronize the executors by creating and materializing any random RDD
>>         3. Make the tasks longer, i.e. with some silly computational work.
>>
>> Mike
>>
>>
>> On 4/17/16, Raghava Mutharaju <m....@gmail.com> wrote:
>> > Yes its the same data.
>> >
>> > 1) The number of partitions are the same (8, which is an argument to the
>> > HashPartitioner). In the first case, these partitions are spread across
>> > both the worker nodes. In the second case, all the partitions are on the
>> > same node.
>> > 2) What resources would be of interest here? Scala shell takes the
>> default
>> > parameters since we use "bin/spark-shell --master <master-URL>" to run
>> the
>> > scala-shell. For the scala program, we do set some configuration options
>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>> > serializer.
>> >
>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>> > RAM.1 executor runs on each worker node. Following configuration options
>> > are set for the scala program -- perhaps we should move it to the spark
>> > config file.
>> >
>> > Driver memory and executor memory are set to 12GB
>> > parallelism is set to 8
>> > Kryo serializer is used
>> > Number of retainedJobs and retainedStages has been increased to check
>> them
>> > in the UI.
>> >
>> > What information regarding Spark Context would be of interest here?
>> >
>> > Regards,
>> > Raghava.
>> >
>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <an...@gmail.com>
>> wrote:
>> >
>> >> If the data file is same then it should have similar distribution of
>> >> keys.
>> >> Few queries-
>> >>
>> >> 1. Did you compare the number of partitions in both the cases?
>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> >> Program being submitted?
>> >>
>> >> Also, can you please share the details of Spark Context, Environment
>> and
>> >> Executors when you run via Scala program?
>> >>
>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>> >> m.vijayaraghava@gmail.com> wrote:
>> >>
>> >>> Hello All,
>> >>>
>> >>> We are using HashPartitioner in the following way on a 3 node cluster
>> (1
>> >>> master and 2 worker nodes).
>> >>>
>> >>> val u =
>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>> (y.toInt,
>> >>> x.toInt) } }).partitionBy(new
>> HashPartitioner(8)).setName("u").persist()
>> >>>
>> >>> u.count()
>> >>>
>> >>> If we run this from the spark shell, the data (52 MB) is split across
>> >>> the
>> >>> two worker nodes. But if we put this in a scala program and run it,
>> then
>> >>> all the data goes to only one node. We have run it multiple times, but
>> >>> this
>> >>> behavior does not change. This seems strange.
>> >>>
>> >>> Is there some problem with the way we use HashPartitioner?
>> >>>
>> >>> Thanks in advance.
>> >>>
>> >>> Regards,
>> >>> Raghava.
>> >>>
>> >>
>> >>
>> >
>> >
>> > --
>> > Regards,
>> > Raghava
>> > http://raghavam.github.io
>> >
>>
>>
>> --
>> Thanks,
>> Mike
>>
>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>

Re: executor delay in Spark

Posted by Raghava Mutharaju <m....@gmail.com>.
Thank you. For now we plan to use spark-shell to submit jobs.

Regards,
Raghava.


On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91...@gmail.com> wrote:

> Glad to hear that the problem was solvable! I have not seen delays of this
> type for later stages in jobs run by spark-submit, but I do not think it
> impossible if your stage has no lineage dependence on other RDDs.
>
> I'm CC'ing the dev list to report of other users observing load imbalance
> caused by unusual initial task scheduling. I don't know of ways to avoid
> this other than creating a dummy task to synchronize the executors, but
> hopefully someone from there can suggest other possibilities.
>
> Mike
> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m....@gmail.com>
> wrote:
>
>> Mike,
>>
>> It turns out the executor delay, as you mentioned, is the cause. After we
>> introduced a dummy stage, partitioning was working fine. Does this delay
>> happen during later stages as well? We noticed the same behavior
>> (partitioning happens on spark-shell but not through spark-submit) at a
>> later stage also.
>>
>> Apart from introducing a dummy stage or running it from spark-shell, is
>> there any other option to fix this?
>>
>> Regards,
>> Raghava.
>>
>>
>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91...@gmail.com> wrote:
>>
>>> When submitting a job with spark-submit, I've observed delays (up to
>>> 1--2 seconds) for the executors to respond to the driver in order to
>>> receive tasks in the first stage. The delay does not persist once the
>>> executors have been synchronized.
>>>
>>> When the tasks are very short, as may be your case (relatively small
>>> data and a simple map task like you have described), the 8 tasks in
>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>> the second executor won't have responded to the master before the
>>> first 4 tasks on the first executor have completed.
>>>
>>> To see if this is the cause in your particular case, you could try the
>>> following to confirm:
>>>         1. Examine the starting times of the tasks alongside their
>>> executor
>>>         2. Make a "dummy" stage execute before your real stages to
>>> synchronize the executors by creating and materializing any random RDD
>>>         3. Make the tasks longer, i.e. with some silly computational
>>> work.
>>>
>>> Mike
>>>
>>>
>>> On 4/17/16, Raghava Mutharaju <m....@gmail.com> wrote:
>>> > Yes its the same data.
>>> >
>>> > 1) The number of partitions are the same (8, which is an argument to
>>> the
>>> > HashPartitioner). In the first case, these partitions are spread across
>>> > both the worker nodes. In the second case, all the partitions are on
>>> the
>>> > same node.
>>> > 2) What resources would be of interest here? Scala shell takes the
>>> default
>>> > parameters since we use "bin/spark-shell --master <master-URL>" to run
>>> the
>>> > scala-shell. For the scala program, we do set some configuration
>>> options
>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>>> > serializer.
>>> >
>>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>>> > RAM.1 executor runs on each worker node. Following configuration
>>> options
>>> > are set for the scala program -- perhaps we should move it to the spark
>>> > config file.
>>> >
>>> > Driver memory and executor memory are set to 12GB
>>> > parallelism is set to 8
>>> > Kryo serializer is used
>>> > Number of retainedJobs and retainedStages has been increased to check
>>> them
>>> > in the UI.
>>> >
>>> > What information regarding Spark Context would be of interest here?
>>> >
>>> > Regards,
>>> > Raghava.
>>> >
>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <an...@gmail.com>
>>> wrote:
>>> >
>>> >> If the data file is same then it should have similar distribution of
>>> >> keys.
>>> >> Few queries-
>>> >>
>>> >> 1. Did you compare the number of partitions in both the cases?
>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>> >> Program being submitted?
>>> >>
>>> >> Also, can you please share the details of Spark Context, Environment
>>> and
>>> >> Executors when you run via Scala program?
>>> >>
>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>> >> m.vijayaraghava@gmail.com> wrote:
>>> >>
>>> >>> Hello All,
>>> >>>
>>> >>> We are using HashPartitioner in the following way on a 3 node
>>> cluster (1
>>> >>> master and 2 worker nodes).
>>> >>>
>>> >>> val u =
>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>> (y.toInt,
>>> >>> x.toInt) } }).partitionBy(new
>>> HashPartitioner(8)).setName("u").persist()
>>> >>>
>>> >>> u.count()
>>> >>>
>>> >>> If we run this from the spark shell, the data (52 MB) is split across
>>> >>> the
>>> >>> two worker nodes. But if we put this in a scala program and run it,
>>> then
>>> >>> all the data goes to only one node. We have run it multiple times,
>>> but
>>> >>> this
>>> >>> behavior does not change. This seems strange.
>>> >>>
>>> >>> Is there some problem with the way we use HashPartitioner?
>>> >>>
>>> >>> Thanks in advance.
>>> >>>
>>> >>> Regards,
>>> >>> Raghava.
>>> >>>
>>> >>
>>> >>
>>> >
>>> >
>>> > --
>>> > Regards,
>>> > Raghava
>>> > http://raghavam.github.io
>>> >
>>>
>>>
>>> --
>>> Thanks,
>>> Mike
>>>
>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>


-- 
Regards,
Raghava
http://raghavam.github.io

Re: executor delay in Spark

Posted by Jeff Zhang <zj...@gmail.com>.
Maybe this is due to config spark.scheduler.minRegisteredResourcesRatio,
you can try set it as 1 to see the behavior.


// Submit tasks only after (registered resources / total expected resources)

// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio =
  math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))


On Mon, Apr 25, 2016 at 7:17 AM, Mike Hynes <91...@gmail.com> wrote:

> Could you change numPartitions to {16, 32, 64} and run your program for
> each to see how many partitions are allocated to each worker? Let's see if
> you experience an all-nothing imbalance that way; if so, my guess is that
> something else is odd in your program logic or spark runtime environment,
> but if not and your executors all receive at least *some* partitions, then
> I still wouldn't rule out effects of scheduling delay. It's a simple test,
> but it could give some insight.
>
> Mike
>
> his could still be a  scheduling  If only one has *all* partitions,  and
> email me the log file? (If it's 10+ MB, just the first few thousand lines
> are fine).
> On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m....@gmail.com>
> wrote:
>
>> Mike, All,
>>
>> It turns out that the second time we encountered the uneven-partition
>> issue is not due to spark-submit. It was resolved with the change in
>> placement of count().
>>
>> Case-1:
>>
>> val numPartitions = 8
>> // read uAxioms from HDFS, use hash partitioner on it and persist it
>> // read type1Axioms from HDFS, use hash partitioner on it and persist it
>> currDeltaURule1 = type1Axioms.join(uAxioms)
>>                              .values
>>                              .distinct(numPartitions)
>>                              .partitionBy(hashPartitioner)
>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>
>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>                                        .count()
>>
>> <more transformations and actions>
>>
>> currDeltaURule1 RDD results in all the data on one node (there are 2
>> worker nodes). If we move count(), the uneven partition issue is resolved.
>>
>> Case-2:
>>
>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>
>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>                                        <count not run here>
>>
>> <some more transformations that don't affect currDeltaURule1 rdd>
>>
>> <rdd.count()> -- this rdd depends on currDeltaURule1 and it gets
>> executed. This resolved the uneven partitioning issue.
>>
>> I don't see why the moving of an action to a later part in the code would
>> affect the partitioning. Are there other factors at play here that affect
>> the partitioning?
>>
>> (Inconsistent) uneven partitioning leads to one machine getting over
>> burdened (memory and number of tasks). We see a clear improvement in
>> runtime when the partitioning is even (happens when count is moved).
>>
>> Any pointers in figuring out this issue is much appreciated.
>>
>> Regards,
>> Raghava.
>>
>>
>>
>>
>> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91...@gmail.com> wrote:
>>
>>> Glad to hear that the problem was solvable! I have not seen delays of
>>> this type for later stages in jobs run by spark-submit, but I do not think
>>> it impossible if your stage has no lineage dependence on other RDDs.
>>>
>>> I'm CC'ing the dev list to report of other users observing load
>>> imbalance caused by unusual initial task scheduling. I don't know of ways
>>> to avoid this other than creating a dummy task to synchronize the
>>> executors, but hopefully someone from there can suggest other possibilities.
>>>
>>> Mike
>>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m....@gmail.com>
>>> wrote:
>>>
>>>> Mike,
>>>>
>>>> It turns out the executor delay, as you mentioned, is the cause. After
>>>> we introduced a dummy stage, partitioning was working fine. Does this delay
>>>> happen during later stages as well? We noticed the same behavior
>>>> (partitioning happens on spark-shell but not through spark-submit) at a
>>>> later stage also.
>>>>
>>>> Apart from introducing a dummy stage or running it from spark-shell, is
>>>> there any other option to fix this?
>>>>
>>>> Regards,
>>>> Raghava.
>>>>
>>>>
>>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91...@gmail.com> wrote:
>>>>
>>>>> When submitting a job with spark-submit, I've observed delays (up to
>>>>> 1--2 seconds) for the executors to respond to the driver in order to
>>>>> receive tasks in the first stage. The delay does not persist once the
>>>>> executors have been synchronized.
>>>>>
>>>>> When the tasks are very short, as may be your case (relatively small
>>>>> data and a simple map task like you have described), the 8 tasks in
>>>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>>>> the second executor won't have responded to the master before the
>>>>> first 4 tasks on the first executor have completed.
>>>>>
>>>>> To see if this is the cause in your particular case, you could try the
>>>>> following to confirm:
>>>>>         1. Examine the starting times of the tasks alongside their
>>>>> executor
>>>>>         2. Make a "dummy" stage execute before your real stages to
>>>>> synchronize the executors by creating and materializing any random RDD
>>>>>         3. Make the tasks longer, i.e. with some silly computational
>>>>> work.
>>>>>
>>>>> Mike
>>>>>
>>>>>
>>>>> On 4/17/16, Raghava Mutharaju <m....@gmail.com> wrote:
>>>>> > Yes its the same data.
>>>>> >
>>>>> > 1) The number of partitions are the same (8, which is an argument to
>>>>> the
>>>>> > HashPartitioner). In the first case, these partitions are spread
>>>>> across
>>>>> > both the worker nodes. In the second case, all the partitions are on
>>>>> the
>>>>> > same node.
>>>>> > 2) What resources would be of interest here? Scala shell takes the
>>>>> default
>>>>> > parameters since we use "bin/spark-shell --master <master-URL>" to
>>>>> run the
>>>>> > scala-shell. For the scala program, we do set some configuration
>>>>> options
>>>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>>>>> > serializer.
>>>>> >
>>>>> > We are running this on Azure D3-v2 machines which have 4 cores and
>>>>> 14GB
>>>>> > RAM.1 executor runs on each worker node. Following configuration
>>>>> options
>>>>> > are set for the scala program -- perhaps we should move it to the
>>>>> spark
>>>>> > config file.
>>>>> >
>>>>> > Driver memory and executor memory are set to 12GB
>>>>> > parallelism is set to 8
>>>>> > Kryo serializer is used
>>>>> > Number of retainedJobs and retainedStages has been increased to
>>>>> check them
>>>>> > in the UI.
>>>>> >
>>>>> > What information regarding Spark Context would be of interest here?
>>>>> >
>>>>> > Regards,
>>>>> > Raghava.
>>>>> >
>>>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <an...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> >> If the data file is same then it should have similar distribution of
>>>>> >> keys.
>>>>> >> Few queries-
>>>>> >>
>>>>> >> 1. Did you compare the number of partitions in both the cases?
>>>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>>>> >> Program being submitted?
>>>>> >>
>>>>> >> Also, can you please share the details of Spark Context,
>>>>> Environment and
>>>>> >> Executors when you run via Scala program?
>>>>> >>
>>>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>>>> >> m.vijayaraghava@gmail.com> wrote:
>>>>> >>
>>>>> >>> Hello All,
>>>>> >>>
>>>>> >>> We are using HashPartitioner in the following way on a 3 node
>>>>> cluster (1
>>>>> >>> master and 2 worker nodes).
>>>>> >>>
>>>>> >>> val u =
>>>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>>>> (y.toInt,
>>>>> >>> x.toInt) } }).partitionBy(new
>>>>> HashPartitioner(8)).setName("u").persist()
>>>>> >>>
>>>>> >>> u.count()
>>>>> >>>
>>>>> >>> If we run this from the spark shell, the data (52 MB) is split
>>>>> across
>>>>> >>> the
>>>>> >>> two worker nodes. But if we put this in a scala program and run
>>>>> it, then
>>>>> >>> all the data goes to only one node. We have run it multiple times,
>>>>> but
>>>>> >>> this
>>>>> >>> behavior does not change. This seems strange.
>>>>> >>>
>>>>> >>> Is there some problem with the way we use HashPartitioner?
>>>>> >>>
>>>>> >>> Thanks in advance.
>>>>> >>>
>>>>> >>> Regards,
>>>>> >>> Raghava.
>>>>> >>>
>>>>> >>
>>>>> >>
>>>>> >
>>>>> >
>>>>> > --
>>>>> > Regards,
>>>>> > Raghava
>>>>> > http://raghavam.github.io
>>>>> >
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Mike
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Raghava
>>>> http://raghavam.github.io
>>>>
>>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>


-- 
Best Regards

Jeff Zhang

Re: executor delay in Spark

Posted by Jeff Zhang <zj...@gmail.com>.
Maybe this is due to config spark.scheduler.minRegisteredResourcesRatio,
you can try set it as 1 to see the behavior.


// Submit tasks only after (registered resources / total expected resources)

// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio =
  math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))


On Mon, Apr 25, 2016 at 7:17 AM, Mike Hynes <91...@gmail.com> wrote:

> Could you change numPartitions to {16, 32, 64} and run your program for
> each to see how many partitions are allocated to each worker? Let's see if
> you experience an all-nothing imbalance that way; if so, my guess is that
> something else is odd in your program logic or spark runtime environment,
> but if not and your executors all receive at least *some* partitions, then
> I still wouldn't rule out effects of scheduling delay. It's a simple test,
> but it could give some insight.
>
> Mike
>
> his could still be a  scheduling  If only one has *all* partitions,  and
> email me the log file? (If it's 10+ MB, just the first few thousand lines
> are fine).
> On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m....@gmail.com>
> wrote:
>
>> Mike, All,
>>
>> It turns out that the second time we encountered the uneven-partition
>> issue is not due to spark-submit. It was resolved with the change in
>> placement of count().
>>
>> Case-1:
>>
>> val numPartitions = 8
>> // read uAxioms from HDFS, use hash partitioner on it and persist it
>> // read type1Axioms from HDFS, use hash partitioner on it and persist it
>> currDeltaURule1 = type1Axioms.join(uAxioms)
>>                              .values
>>                              .distinct(numPartitions)
>>                              .partitionBy(hashPartitioner)
>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>
>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>                                        .count()
>>
>> <more transformations and actions>
>>
>> currDeltaURule1 RDD results in all the data on one node (there are 2
>> worker nodes). If we move count(), the uneven partition issue is resolved.
>>
>> Case-2:
>>
>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>
>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>                                        <count not run here>
>>
>> <some more transformations that don't affect currDeltaURule1 rdd>
>>
>> <rdd.count()> -- this rdd depends on currDeltaURule1 and it gets
>> executed. This resolved the uneven partitioning issue.
>>
>> I don't see why the moving of an action to a later part in the code would
>> affect the partitioning. Are there other factors at play here that affect
>> the partitioning?
>>
>> (Inconsistent) uneven partitioning leads to one machine getting over
>> burdened (memory and number of tasks). We see a clear improvement in
>> runtime when the partitioning is even (happens when count is moved).
>>
>> Any pointers in figuring out this issue is much appreciated.
>>
>> Regards,
>> Raghava.
>>
>>
>>
>>
>> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91...@gmail.com> wrote:
>>
>>> Glad to hear that the problem was solvable! I have not seen delays of
>>> this type for later stages in jobs run by spark-submit, but I do not think
>>> it impossible if your stage has no lineage dependence on other RDDs.
>>>
>>> I'm CC'ing the dev list to report of other users observing load
>>> imbalance caused by unusual initial task scheduling. I don't know of ways
>>> to avoid this other than creating a dummy task to synchronize the
>>> executors, but hopefully someone from there can suggest other possibilities.
>>>
>>> Mike
>>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m....@gmail.com>
>>> wrote:
>>>
>>>> Mike,
>>>>
>>>> It turns out the executor delay, as you mentioned, is the cause. After
>>>> we introduced a dummy stage, partitioning was working fine. Does this delay
>>>> happen during later stages as well? We noticed the same behavior
>>>> (partitioning happens on spark-shell but not through spark-submit) at a
>>>> later stage also.
>>>>
>>>> Apart from introducing a dummy stage or running it from spark-shell, is
>>>> there any other option to fix this?
>>>>
>>>> Regards,
>>>> Raghava.
>>>>
>>>>
>>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91...@gmail.com> wrote:
>>>>
>>>>> When submitting a job with spark-submit, I've observed delays (up to
>>>>> 1--2 seconds) for the executors to respond to the driver in order to
>>>>> receive tasks in the first stage. The delay does not persist once the
>>>>> executors have been synchronized.
>>>>>
>>>>> When the tasks are very short, as may be your case (relatively small
>>>>> data and a simple map task like you have described), the 8 tasks in
>>>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>>>> the second executor won't have responded to the master before the
>>>>> first 4 tasks on the first executor have completed.
>>>>>
>>>>> To see if this is the cause in your particular case, you could try the
>>>>> following to confirm:
>>>>>         1. Examine the starting times of the tasks alongside their
>>>>> executor
>>>>>         2. Make a "dummy" stage execute before your real stages to
>>>>> synchronize the executors by creating and materializing any random RDD
>>>>>         3. Make the tasks longer, i.e. with some silly computational
>>>>> work.
>>>>>
>>>>> Mike
>>>>>
>>>>>
>>>>> On 4/17/16, Raghava Mutharaju <m....@gmail.com> wrote:
>>>>> > Yes its the same data.
>>>>> >
>>>>> > 1) The number of partitions are the same (8, which is an argument to
>>>>> the
>>>>> > HashPartitioner). In the first case, these partitions are spread
>>>>> across
>>>>> > both the worker nodes. In the second case, all the partitions are on
>>>>> the
>>>>> > same node.
>>>>> > 2) What resources would be of interest here? Scala shell takes the
>>>>> default
>>>>> > parameters since we use "bin/spark-shell --master <master-URL>" to
>>>>> run the
>>>>> > scala-shell. For the scala program, we do set some configuration
>>>>> options
>>>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>>>>> > serializer.
>>>>> >
>>>>> > We are running this on Azure D3-v2 machines which have 4 cores and
>>>>> 14GB
>>>>> > RAM.1 executor runs on each worker node. Following configuration
>>>>> options
>>>>> > are set for the scala program -- perhaps we should move it to the
>>>>> spark
>>>>> > config file.
>>>>> >
>>>>> > Driver memory and executor memory are set to 12GB
>>>>> > parallelism is set to 8
>>>>> > Kryo serializer is used
>>>>> > Number of retainedJobs and retainedStages has been increased to
>>>>> check them
>>>>> > in the UI.
>>>>> >
>>>>> > What information regarding Spark Context would be of interest here?
>>>>> >
>>>>> > Regards,
>>>>> > Raghava.
>>>>> >
>>>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <an...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> >> If the data file is same then it should have similar distribution of
>>>>> >> keys.
>>>>> >> Few queries-
>>>>> >>
>>>>> >> 1. Did you compare the number of partitions in both the cases?
>>>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>>>> >> Program being submitted?
>>>>> >>
>>>>> >> Also, can you please share the details of Spark Context,
>>>>> Environment and
>>>>> >> Executors when you run via Scala program?
>>>>> >>
>>>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>>>> >> m.vijayaraghava@gmail.com> wrote:
>>>>> >>
>>>>> >>> Hello All,
>>>>> >>>
>>>>> >>> We are using HashPartitioner in the following way on a 3 node
>>>>> cluster (1
>>>>> >>> master and 2 worker nodes).
>>>>> >>>
>>>>> >>> val u =
>>>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>>>> (y.toInt,
>>>>> >>> x.toInt) } }).partitionBy(new
>>>>> HashPartitioner(8)).setName("u").persist()
>>>>> >>>
>>>>> >>> u.count()
>>>>> >>>
>>>>> >>> If we run this from the spark shell, the data (52 MB) is split
>>>>> across
>>>>> >>> the
>>>>> >>> two worker nodes. But if we put this in a scala program and run
>>>>> it, then
>>>>> >>> all the data goes to only one node. We have run it multiple times,
>>>>> but
>>>>> >>> this
>>>>> >>> behavior does not change. This seems strange.
>>>>> >>>
>>>>> >>> Is there some problem with the way we use HashPartitioner?
>>>>> >>>
>>>>> >>> Thanks in advance.
>>>>> >>>
>>>>> >>> Regards,
>>>>> >>> Raghava.
>>>>> >>>
>>>>> >>
>>>>> >>
>>>>> >
>>>>> >
>>>>> > --
>>>>> > Regards,
>>>>> > Raghava
>>>>> > http://raghavam.github.io
>>>>> >
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Mike
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Raghava
>>>> http://raghavam.github.io
>>>>
>>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>


-- 
Best Regards

Jeff Zhang

Re: executor delay in Spark

Posted by Raghava Mutharaju <m....@gmail.com>.
Hello Mike,

No problem, logs are useful to us anyway. Thank you for all the pointers.
We started off with examining only a single RDD but later on added a few
more. The persist count and unpersist count sequence is the dummy stage
that you suggested us to use to avoid the initial scheduler delay.

Our issue is very similar to the one you posted:
http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-td16988.html.
We tried with spark.shuffle.reduceLocality.enabled=false and it helps in
certain cases. Were you able to fix that issue? We use Spark 1.6.0

We noticed the following
1) persisting an RDD seems to lead to unbalanced distribution of partitions
across the executors.
2) If one RDD has an all-nothing skew then rest of the RDDs that depend on
it also get all-nothing skew.

Regards,
Raghava.


On Wed, Apr 27, 2016 at 10:20 AM, Mike Hynes <91...@gmail.com> wrote:

> Hi Raghava,
>
> I'm terribly sorry about the end of my last email; that garbled
> sentence was garbled because it wasn't meant to exist; I wrote it on
> my phone, realized I wouldn't realistically have time to look into
> another set of logs deeply enough, and then mistook myself for having
> deleted it. Again, I'm very sorry for my error here.
>
> I did peek at your code, though, and think you could try the following:
> 0. The actions in your main method are many, and it will be hard to
> isolate a problem; I would recommend only examing *one* RDD at first,
> rather than six.
> 1. There is a lot of repetition for reading RDDs from textfiles
> sequentially; if you put those lines into two methods depending on RDD
> type, you will at least have one entry point to work with once you
> make a simplified test program.
> 2. In one part you persist, count, immediately unpersist, and then
> count again an RDD.. I'm not acquainted with this idiom, and I don't
> understand what that is to achieve. It strikes me suspect for
> triggering unusual garbage collection, which would, I think, only
> complicate your trace debugging.
>
> I've attached a python script that dumps relevant info from the Spark
> JSON logs into a CSV for easier analysis in you language of choice;
> hopefully it can aid in finer grained debugging (the headers of the
> fields it prints are listed in one of the functions).
>
> Mike
>
> On 4/25/16, Raghava Mutharaju <m....@gmail.com> wrote:
> > Mike,
> >
> > We ran our program with 16, 32 and 64 partitions. The behavior was same
> as
> > before with 8 partitions. It was mixed -- for some RDDs we see an
> > all-nothing skew, but for others we see them getting split across the 2
> > worker nodes. In some cases, they start with even split and in later
> > iterations it goes to all-nothing split. Please find the logs attached.
> >
> > our program source code:
> >
> https://github.com/raghavam/sparkel/blob/275ecbb901a58592d8a70a8568dd95c839d46ecc/src/main/scala/org/daselab/sparkel/SparkELDAGAnalysis.scala
> >
> > We put in persist() statements for different RDDs just to check their
> skew.
> >
> > @Jeff, setting minRegisteredResourcesRatio did not help. Behavior was
> same
> > as before.
> >
> > Thank you for your time.
> >
> > Regards,
> > Raghava.
> >
> >
> > On Sun, Apr 24, 2016 at 7:17 PM, Mike Hynes <91...@gmail.com> wrote:
> >
> >> Could you change numPartitions to {16, 32, 64} and run your program for
> >> each to see how many partitions are allocated to each worker? Let's see
> >> if
> >> you experience an all-nothing imbalance that way; if so, my guess is
> that
> >> something else is odd in your program logic or spark runtime
> environment,
> >> but if not and your executors all receive at least *some* partitions,
> >> then
> >> I still wouldn't rule out effects of scheduling delay. It's a simple
> >> test,
> >> but it could give some insight.
> >>
> >> Mike
> >>
> >> his could still be a  scheduling  If only one has *all* partitions,  and
> >> email me the log file? (If it's 10+ MB, just the first few thousand
> lines
> >> are fine).
> >> On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m.vijayaraghava@gmail.com
> >
> >> wrote:
> >>
> >>> Mike, All,
> >>>
> >>> It turns out that the second time we encountered the uneven-partition
> >>> issue is not due to spark-submit. It was resolved with the change in
> >>> placement of count().
> >>>
> >>> Case-1:
> >>>
> >>> val numPartitions = 8
> >>> // read uAxioms from HDFS, use hash partitioner on it and persist it
> >>> // read type1Axioms from HDFS, use hash partitioner on it and persist
> it
> >>> currDeltaURule1 = type1Axioms.join(uAxioms)
> >>>                              .values
> >>>                              .distinct(numPartitions)
> >>>                              .partitionBy(hashPartitioner)
> >>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
> >>>
> >>>  .persist(StorageLevel.MEMORY_AND_DISK)
> >>>                                        .count()
> >>>
> >>> <more transformations and actions>
> >>>
> >>> currDeltaURule1 RDD results in all the data on one node (there are 2
> >>> worker nodes). If we move count(), the uneven partition issue is
> >>> resolved.
> >>>
> >>> Case-2:
> >>>
> >>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
> >>>
> >>>  .persist(StorageLevel.MEMORY_AND_DISK)
> >>>                                        <count not run here>
> >>>
> >>> <some more transformations that don't affect currDeltaURule1 rdd>
> >>>
> >>> <rdd.count()> -- this rdd depends on currDeltaURule1 and it gets
> >>> executed. This resolved the uneven partitioning issue.
> >>>
> >>> I don't see why the moving of an action to a later part in the code
> >>> would
> >>> affect the partitioning. Are there other factors at play here that
> >>> affect
> >>> the partitioning?
> >>>
> >>> (Inconsistent) uneven partitioning leads to one machine getting over
> >>> burdened (memory and number of tasks). We see a clear improvement in
> >>> runtime when the partitioning is even (happens when count is moved).
> >>>
> >>> Any pointers in figuring out this issue is much appreciated.
> >>>
> >>> Regards,
> >>> Raghava.
> >>>
> >>>
> >>>
> >>>
> >>> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91...@gmail.com> wrote:
> >>>
> >>>> Glad to hear that the problem was solvable! I have not seen delays of
> >>>> this type for later stages in jobs run by spark-submit, but I do not
> >>>> think
> >>>> it impossible if your stage has no lineage dependence on other RDDs.
> >>>>
> >>>> I'm CC'ing the dev list to report of other users observing load
> >>>> imbalance caused by unusual initial task scheduling. I don't know of
> >>>> ways
> >>>> to avoid this other than creating a dummy task to synchronize the
> >>>> executors, but hopefully someone from there can suggest other
> >>>> possibilities.
> >>>>
> >>>> Mike
> >>>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju"
> >>>> <m....@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Mike,
> >>>>>
> >>>>> It turns out the executor delay, as you mentioned, is the cause.
> After
> >>>>> we introduced a dummy stage, partitioning was working fine. Does this
> >>>>> delay
> >>>>> happen during later stages as well? We noticed the same behavior
> >>>>> (partitioning happens on spark-shell but not through spark-submit) at
> >>>>> a
> >>>>> later stage also.
> >>>>>
> >>>>> Apart from introducing a dummy stage or running it from spark-shell,
> >>>>> is
> >>>>> there any other option to fix this?
> >>>>>
> >>>>> Regards,
> >>>>> Raghava.
> >>>>>
> >>>>>
> >>>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91...@gmail.com>
> wrote:
> >>>>>
> >>>>>> When submitting a job with spark-submit, I've observed delays (up to
> >>>>>> 1--2 seconds) for the executors to respond to the driver in order to
> >>>>>> receive tasks in the first stage. The delay does not persist once
> the
> >>>>>> executors have been synchronized.
> >>>>>>
> >>>>>> When the tasks are very short, as may be your case (relatively small
> >>>>>> data and a simple map task like you have described), the 8 tasks in
> >>>>>> your stage may be allocated to only 1 executor in 2 waves of 4,
> since
> >>>>>> the second executor won't have responded to the master before the
> >>>>>> first 4 tasks on the first executor have completed.
> >>>>>>
> >>>>>> To see if this is the cause in your particular case, you could try
> >>>>>> the
> >>>>>> following to confirm:
> >>>>>>         1. Examine the starting times of the tasks alongside their
> >>>>>> executor
> >>>>>>         2. Make a "dummy" stage execute before your real stages to
> >>>>>> synchronize the executors by creating and materializing any random
> >>>>>> RDD
> >>>>>>         3. Make the tasks longer, i.e. with some silly computational
> >>>>>> work.
> >>>>>>
> >>>>>> Mike
> >>>>>>
> >>>>>>
> >>>>>> On 4/17/16, Raghava Mutharaju <m....@gmail.com> wrote:
> >>>>>> > Yes its the same data.
> >>>>>> >
> >>>>>> > 1) The number of partitions are the same (8, which is an argument
> >>>>>> > to
> >>>>>> the
> >>>>>> > HashPartitioner). In the first case, these partitions are spread
> >>>>>> across
> >>>>>> > both the worker nodes. In the second case, all the partitions are
> >>>>>> > on
> >>>>>> the
> >>>>>> > same node.
> >>>>>> > 2) What resources would be of interest here? Scala shell takes the
> >>>>>> default
> >>>>>> > parameters since we use "bin/spark-shell --master <master-URL>" to
> >>>>>> run the
> >>>>>> > scala-shell. For the scala program, we do set some configuration
> >>>>>> options
> >>>>>> > such as driver memory (12GB), parallelism is set to 8 and we use
> >>>>>> > Kryo
> >>>>>> > serializer.
> >>>>>> >
> >>>>>> > We are running this on Azure D3-v2 machines which have 4 cores and
> >>>>>> 14GB
> >>>>>> > RAM.1 executor runs on each worker node. Following configuration
> >>>>>> options
> >>>>>> > are set for the scala program -- perhaps we should move it to the
> >>>>>> spark
> >>>>>> > config file.
> >>>>>> >
> >>>>>> > Driver memory and executor memory are set to 12GB
> >>>>>> > parallelism is set to 8
> >>>>>> > Kryo serializer is used
> >>>>>> > Number of retainedJobs and retainedStages has been increased to
> >>>>>> check them
> >>>>>> > in the UI.
> >>>>>> >
> >>>>>> > What information regarding Spark Context would be of interest
> here?
> >>>>>> >
> >>>>>> > Regards,
> >>>>>> > Raghava.
> >>>>>> >
> >>>>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <an...@gmail.com>
> >>>>>> wrote:
> >>>>>> >
> >>>>>> >> If the data file is same then it should have similar distribution
> >>>>>> >> of
> >>>>>> >> keys.
> >>>>>> >> Few queries-
> >>>>>> >>
> >>>>>> >> 1. Did you compare the number of partitions in both the cases?
> >>>>>> >> 2. Did you compare the resource allocation for Spark Shell vs
> >>>>>> >> Scala
> >>>>>> >> Program being submitted?
> >>>>>> >>
> >>>>>> >> Also, can you please share the details of Spark Context,
> >>>>>> Environment and
> >>>>>> >> Executors when you run via Scala program?
> >>>>>> >>
> >>>>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
> >>>>>> >> m.vijayaraghava@gmail.com> wrote:
> >>>>>> >>
> >>>>>> >>> Hello All,
> >>>>>> >>>
> >>>>>> >>> We are using HashPartitioner in the following way on a 3 node
> >>>>>> cluster (1
> >>>>>> >>> master and 2 worker nodes).
> >>>>>> >>>
> >>>>>> >>> val u =
> >>>>>> >>>
> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
> >>>>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
> >>>>>> (y.toInt,
> >>>>>> >>> x.toInt) } }).partitionBy(new
> >>>>>> HashPartitioner(8)).setName("u").persist()
> >>>>>> >>>
> >>>>>> >>> u.count()
> >>>>>> >>>
> >>>>>> >>> If we run this from the spark shell, the data (52 MB) is split
> >>>>>> across
> >>>>>> >>> the
> >>>>>> >>> two worker nodes. But if we put this in a scala program and run
> >>>>>> it, then
> >>>>>> >>> all the data goes to only one node. We have run it multiple
> >>>>>> >>> times,
> >>>>>> but
> >>>>>> >>> this
> >>>>>> >>> behavior does not change. This seems strange.
> >>>>>> >>>
> >>>>>> >>> Is there some problem with the way we use HashPartitioner?
> >>>>>> >>>
> >>>>>> >>> Thanks in advance.
> >>>>>> >>>
> >>>>>> >>> Regards,
> >>>>>> >>> Raghava.
> >>>>>> >>>
> >>>>>> >>
> >>>>>> >>
> >>>>>> >
> >>>>>> >
> >>>>>> > --
> >>>>>> > Regards,
> >>>>>> > Raghava
> >>>>>> > http://raghavam.github.io
> >>>>>> >
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Thanks,
> >>>>>> Mike
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Regards,
> >>>>> Raghava
> >>>>> http://raghavam.github.io
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> Regards,
> >>> Raghava
> >>> http://raghavam.github.io
> >>>
> >>
> >
> >
> > --
> > Regards,
> > Raghava
> > http://raghavam.github.io
> >
>
>
> --
> Thanks,
> Mike
>



-- 
Regards,
Raghava
http://raghavam.github.io

Re: executor delay in Spark

Posted by Mike Hynes <91...@gmail.com>.
Hi Raghava,

I'm terribly sorry about the end of my last email; that garbled
sentence was garbled because it wasn't meant to exist; I wrote it on
my phone, realized I wouldn't realistically have time to look into
another set of logs deeply enough, and then mistook myself for having
deleted it. Again, I'm very sorry for my error here.

I did peek at your code, though, and think you could try the following:
0. The actions in your main method are many, and it will be hard to
isolate a problem; I would recommend only examing *one* RDD at first,
rather than six.
1. There is a lot of repetition for reading RDDs from textfiles
sequentially; if you put those lines into two methods depending on RDD
type, you will at least have one entry point to work with once you
make a simplified test program.
2. In one part you persist, count, immediately unpersist, and then
count again an RDD.. I'm not acquainted with this idiom, and I don't
understand what that is to achieve. It strikes me suspect for
triggering unusual garbage collection, which would, I think, only
complicate your trace debugging.

I've attached a python script that dumps relevant info from the Spark
JSON logs into a CSV for easier analysis in you language of choice;
hopefully it can aid in finer grained debugging (the headers of the
fields it prints are listed in one of the functions).

Mike

On 4/25/16, Raghava Mutharaju <m....@gmail.com> wrote:
> Mike,
>
> We ran our program with 16, 32 and 64 partitions. The behavior was same as
> before with 8 partitions. It was mixed -- for some RDDs we see an
> all-nothing skew, but for others we see them getting split across the 2
> worker nodes. In some cases, they start with even split and in later
> iterations it goes to all-nothing split. Please find the logs attached.
>
> our program source code:
> https://github.com/raghavam/sparkel/blob/275ecbb901a58592d8a70a8568dd95c839d46ecc/src/main/scala/org/daselab/sparkel/SparkELDAGAnalysis.scala
>
> We put in persist() statements for different RDDs just to check their skew.
>
> @Jeff, setting minRegisteredResourcesRatio did not help. Behavior was same
> as before.
>
> Thank you for your time.
>
> Regards,
> Raghava.
>
>
> On Sun, Apr 24, 2016 at 7:17 PM, Mike Hynes <91...@gmail.com> wrote:
>
>> Could you change numPartitions to {16, 32, 64} and run your program for
>> each to see how many partitions are allocated to each worker? Let's see
>> if
>> you experience an all-nothing imbalance that way; if so, my guess is that
>> something else is odd in your program logic or spark runtime environment,
>> but if not and your executors all receive at least *some* partitions,
>> then
>> I still wouldn't rule out effects of scheduling delay. It's a simple
>> test,
>> but it could give some insight.
>>
>> Mike
>>
>> his could still be a  scheduling  If only one has *all* partitions,  and
>> email me the log file? (If it's 10+ MB, just the first few thousand lines
>> are fine).
>> On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m....@gmail.com>
>> wrote:
>>
>>> Mike, All,
>>>
>>> It turns out that the second time we encountered the uneven-partition
>>> issue is not due to spark-submit. It was resolved with the change in
>>> placement of count().
>>>
>>> Case-1:
>>>
>>> val numPartitions = 8
>>> // read uAxioms from HDFS, use hash partitioner on it and persist it
>>> // read type1Axioms from HDFS, use hash partitioner on it and persist it
>>> currDeltaURule1 = type1Axioms.join(uAxioms)
>>>                              .values
>>>                              .distinct(numPartitions)
>>>                              .partitionBy(hashPartitioner)
>>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>>
>>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>>                                        .count()
>>>
>>> <more transformations and actions>
>>>
>>> currDeltaURule1 RDD results in all the data on one node (there are 2
>>> worker nodes). If we move count(), the uneven partition issue is
>>> resolved.
>>>
>>> Case-2:
>>>
>>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>>
>>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>>                                        <count not run here>
>>>
>>> <some more transformations that don't affect currDeltaURule1 rdd>
>>>
>>> <rdd.count()> -- this rdd depends on currDeltaURule1 and it gets
>>> executed. This resolved the uneven partitioning issue.
>>>
>>> I don't see why the moving of an action to a later part in the code
>>> would
>>> affect the partitioning. Are there other factors at play here that
>>> affect
>>> the partitioning?
>>>
>>> (Inconsistent) uneven partitioning leads to one machine getting over
>>> burdened (memory and number of tasks). We see a clear improvement in
>>> runtime when the partitioning is even (happens when count is moved).
>>>
>>> Any pointers in figuring out this issue is much appreciated.
>>>
>>> Regards,
>>> Raghava.
>>>
>>>
>>>
>>>
>>> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91...@gmail.com> wrote:
>>>
>>>> Glad to hear that the problem was solvable! I have not seen delays of
>>>> this type for later stages in jobs run by spark-submit, but I do not
>>>> think
>>>> it impossible if your stage has no lineage dependence on other RDDs.
>>>>
>>>> I'm CC'ing the dev list to report of other users observing load
>>>> imbalance caused by unusual initial task scheduling. I don't know of
>>>> ways
>>>> to avoid this other than creating a dummy task to synchronize the
>>>> executors, but hopefully someone from there can suggest other
>>>> possibilities.
>>>>
>>>> Mike
>>>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju"
>>>> <m....@gmail.com>
>>>> wrote:
>>>>
>>>>> Mike,
>>>>>
>>>>> It turns out the executor delay, as you mentioned, is the cause. After
>>>>> we introduced a dummy stage, partitioning was working fine. Does this
>>>>> delay
>>>>> happen during later stages as well? We noticed the same behavior
>>>>> (partitioning happens on spark-shell but not through spark-submit) at
>>>>> a
>>>>> later stage also.
>>>>>
>>>>> Apart from introducing a dummy stage or running it from spark-shell,
>>>>> is
>>>>> there any other option to fix this?
>>>>>
>>>>> Regards,
>>>>> Raghava.
>>>>>
>>>>>
>>>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91...@gmail.com> wrote:
>>>>>
>>>>>> When submitting a job with spark-submit, I've observed delays (up to
>>>>>> 1--2 seconds) for the executors to respond to the driver in order to
>>>>>> receive tasks in the first stage. The delay does not persist once the
>>>>>> executors have been synchronized.
>>>>>>
>>>>>> When the tasks are very short, as may be your case (relatively small
>>>>>> data and a simple map task like you have described), the 8 tasks in
>>>>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>>>>> the second executor won't have responded to the master before the
>>>>>> first 4 tasks on the first executor have completed.
>>>>>>
>>>>>> To see if this is the cause in your particular case, you could try
>>>>>> the
>>>>>> following to confirm:
>>>>>>         1. Examine the starting times of the tasks alongside their
>>>>>> executor
>>>>>>         2. Make a "dummy" stage execute before your real stages to
>>>>>> synchronize the executors by creating and materializing any random
>>>>>> RDD
>>>>>>         3. Make the tasks longer, i.e. with some silly computational
>>>>>> work.
>>>>>>
>>>>>> Mike
>>>>>>
>>>>>>
>>>>>> On 4/17/16, Raghava Mutharaju <m....@gmail.com> wrote:
>>>>>> > Yes its the same data.
>>>>>> >
>>>>>> > 1) The number of partitions are the same (8, which is an argument
>>>>>> > to
>>>>>> the
>>>>>> > HashPartitioner). In the first case, these partitions are spread
>>>>>> across
>>>>>> > both the worker nodes. In the second case, all the partitions are
>>>>>> > on
>>>>>> the
>>>>>> > same node.
>>>>>> > 2) What resources would be of interest here? Scala shell takes the
>>>>>> default
>>>>>> > parameters since we use "bin/spark-shell --master <master-URL>" to
>>>>>> run the
>>>>>> > scala-shell. For the scala program, we do set some configuration
>>>>>> options
>>>>>> > such as driver memory (12GB), parallelism is set to 8 and we use
>>>>>> > Kryo
>>>>>> > serializer.
>>>>>> >
>>>>>> > We are running this on Azure D3-v2 machines which have 4 cores and
>>>>>> 14GB
>>>>>> > RAM.1 executor runs on each worker node. Following configuration
>>>>>> options
>>>>>> > are set for the scala program -- perhaps we should move it to the
>>>>>> spark
>>>>>> > config file.
>>>>>> >
>>>>>> > Driver memory and executor memory are set to 12GB
>>>>>> > parallelism is set to 8
>>>>>> > Kryo serializer is used
>>>>>> > Number of retainedJobs and retainedStages has been increased to
>>>>>> check them
>>>>>> > in the UI.
>>>>>> >
>>>>>> > What information regarding Spark Context would be of interest here?
>>>>>> >
>>>>>> > Regards,
>>>>>> > Raghava.
>>>>>> >
>>>>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <an...@gmail.com>
>>>>>> wrote:
>>>>>> >
>>>>>> >> If the data file is same then it should have similar distribution
>>>>>> >> of
>>>>>> >> keys.
>>>>>> >> Few queries-
>>>>>> >>
>>>>>> >> 1. Did you compare the number of partitions in both the cases?
>>>>>> >> 2. Did you compare the resource allocation for Spark Shell vs
>>>>>> >> Scala
>>>>>> >> Program being submitted?
>>>>>> >>
>>>>>> >> Also, can you please share the details of Spark Context,
>>>>>> Environment and
>>>>>> >> Executors when you run via Scala program?
>>>>>> >>
>>>>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>>>>> >> m.vijayaraghava@gmail.com> wrote:
>>>>>> >>
>>>>>> >>> Hello All,
>>>>>> >>>
>>>>>> >>> We are using HashPartitioner in the following way on a 3 node
>>>>>> cluster (1
>>>>>> >>> master and 2 worker nodes).
>>>>>> >>>
>>>>>> >>> val u =
>>>>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>>>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>>>>> (y.toInt,
>>>>>> >>> x.toInt) } }).partitionBy(new
>>>>>> HashPartitioner(8)).setName("u").persist()
>>>>>> >>>
>>>>>> >>> u.count()
>>>>>> >>>
>>>>>> >>> If we run this from the spark shell, the data (52 MB) is split
>>>>>> across
>>>>>> >>> the
>>>>>> >>> two worker nodes. But if we put this in a scala program and run
>>>>>> it, then
>>>>>> >>> all the data goes to only one node. We have run it multiple
>>>>>> >>> times,
>>>>>> but
>>>>>> >>> this
>>>>>> >>> behavior does not change. This seems strange.
>>>>>> >>>
>>>>>> >>> Is there some problem with the way we use HashPartitioner?
>>>>>> >>>
>>>>>> >>> Thanks in advance.
>>>>>> >>>
>>>>>> >>> Regards,
>>>>>> >>> Raghava.
>>>>>> >>>
>>>>>> >>
>>>>>> >>
>>>>>> >
>>>>>> >
>>>>>> > --
>>>>>> > Regards,
>>>>>> > Raghava
>>>>>> > http://raghavam.github.io
>>>>>> >
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Mike
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Regards,
>>>>> Raghava
>>>>> http://raghavam.github.io
>>>>>
>>>>
>>>
>>>
>>> --
>>> Regards,
>>> Raghava
>>> http://raghavam.github.io
>>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>


-- 
Thanks,
Mike

Re: executor delay in Spark

Posted by Raghava Mutharaju <m....@gmail.com>.
Mike,

We ran our program with 16, 32 and 64 partitions. The behavior was same as
before with 8 partitions. It was mixed -- for some RDDs we see an
all-nothing skew, but for others we see them getting split across the 2
worker nodes. In some cases, they start with even split and in later
iterations it goes to all-nothing split. Please find the logs attached.

our program source code:
https://github.com/raghavam/sparkel/blob/275ecbb901a58592d8a70a8568dd95c839d46ecc/src/main/scala/org/daselab/sparkel/SparkELDAGAnalysis.scala

We put in persist() statements for different RDDs just to check their skew.

@Jeff, setting minRegisteredResourcesRatio did not help. Behavior was same
as before.

Thank you for your time.

Regards,
Raghava.


On Sun, Apr 24, 2016 at 7:17 PM, Mike Hynes <91...@gmail.com> wrote:

> Could you change numPartitions to {16, 32, 64} and run your program for
> each to see how many partitions are allocated to each worker? Let's see if
> you experience an all-nothing imbalance that way; if so, my guess is that
> something else is odd in your program logic or spark runtime environment,
> but if not and your executors all receive at least *some* partitions, then
> I still wouldn't rule out effects of scheduling delay. It's a simple test,
> but it could give some insight.
>
> Mike
>
> his could still be a  scheduling  If only one has *all* partitions,  and
> email me the log file? (If it's 10+ MB, just the first few thousand lines
> are fine).
> On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m....@gmail.com>
> wrote:
>
>> Mike, All,
>>
>> It turns out that the second time we encountered the uneven-partition
>> issue is not due to spark-submit. It was resolved with the change in
>> placement of count().
>>
>> Case-1:
>>
>> val numPartitions = 8
>> // read uAxioms from HDFS, use hash partitioner on it and persist it
>> // read type1Axioms from HDFS, use hash partitioner on it and persist it
>> currDeltaURule1 = type1Axioms.join(uAxioms)
>>                              .values
>>                              .distinct(numPartitions)
>>                              .partitionBy(hashPartitioner)
>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>
>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>                                        .count()
>>
>> <more transformations and actions>
>>
>> currDeltaURule1 RDD results in all the data on one node (there are 2
>> worker nodes). If we move count(), the uneven partition issue is resolved.
>>
>> Case-2:
>>
>> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>>
>>  .persist(StorageLevel.MEMORY_AND_DISK)
>>                                        <count not run here>
>>
>> <some more transformations that don't affect currDeltaURule1 rdd>
>>
>> <rdd.count()> -- this rdd depends on currDeltaURule1 and it gets
>> executed. This resolved the uneven partitioning issue.
>>
>> I don't see why the moving of an action to a later part in the code would
>> affect the partitioning. Are there other factors at play here that affect
>> the partitioning?
>>
>> (Inconsistent) uneven partitioning leads to one machine getting over
>> burdened (memory and number of tasks). We see a clear improvement in
>> runtime when the partitioning is even (happens when count is moved).
>>
>> Any pointers in figuring out this issue is much appreciated.
>>
>> Regards,
>> Raghava.
>>
>>
>>
>>
>> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91...@gmail.com> wrote:
>>
>>> Glad to hear that the problem was solvable! I have not seen delays of
>>> this type for later stages in jobs run by spark-submit, but I do not think
>>> it impossible if your stage has no lineage dependence on other RDDs.
>>>
>>> I'm CC'ing the dev list to report of other users observing load
>>> imbalance caused by unusual initial task scheduling. I don't know of ways
>>> to avoid this other than creating a dummy task to synchronize the
>>> executors, but hopefully someone from there can suggest other possibilities.
>>>
>>> Mike
>>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m....@gmail.com>
>>> wrote:
>>>
>>>> Mike,
>>>>
>>>> It turns out the executor delay, as you mentioned, is the cause. After
>>>> we introduced a dummy stage, partitioning was working fine. Does this delay
>>>> happen during later stages as well? We noticed the same behavior
>>>> (partitioning happens on spark-shell but not through spark-submit) at a
>>>> later stage also.
>>>>
>>>> Apart from introducing a dummy stage or running it from spark-shell, is
>>>> there any other option to fix this?
>>>>
>>>> Regards,
>>>> Raghava.
>>>>
>>>>
>>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91...@gmail.com> wrote:
>>>>
>>>>> When submitting a job with spark-submit, I've observed delays (up to
>>>>> 1--2 seconds) for the executors to respond to the driver in order to
>>>>> receive tasks in the first stage. The delay does not persist once the
>>>>> executors have been synchronized.
>>>>>
>>>>> When the tasks are very short, as may be your case (relatively small
>>>>> data and a simple map task like you have described), the 8 tasks in
>>>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>>>> the second executor won't have responded to the master before the
>>>>> first 4 tasks on the first executor have completed.
>>>>>
>>>>> To see if this is the cause in your particular case, you could try the
>>>>> following to confirm:
>>>>>         1. Examine the starting times of the tasks alongside their
>>>>> executor
>>>>>         2. Make a "dummy" stage execute before your real stages to
>>>>> synchronize the executors by creating and materializing any random RDD
>>>>>         3. Make the tasks longer, i.e. with some silly computational
>>>>> work.
>>>>>
>>>>> Mike
>>>>>
>>>>>
>>>>> On 4/17/16, Raghava Mutharaju <m....@gmail.com> wrote:
>>>>> > Yes its the same data.
>>>>> >
>>>>> > 1) The number of partitions are the same (8, which is an argument to
>>>>> the
>>>>> > HashPartitioner). In the first case, these partitions are spread
>>>>> across
>>>>> > both the worker nodes. In the second case, all the partitions are on
>>>>> the
>>>>> > same node.
>>>>> > 2) What resources would be of interest here? Scala shell takes the
>>>>> default
>>>>> > parameters since we use "bin/spark-shell --master <master-URL>" to
>>>>> run the
>>>>> > scala-shell. For the scala program, we do set some configuration
>>>>> options
>>>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>>>>> > serializer.
>>>>> >
>>>>> > We are running this on Azure D3-v2 machines which have 4 cores and
>>>>> 14GB
>>>>> > RAM.1 executor runs on each worker node. Following configuration
>>>>> options
>>>>> > are set for the scala program -- perhaps we should move it to the
>>>>> spark
>>>>> > config file.
>>>>> >
>>>>> > Driver memory and executor memory are set to 12GB
>>>>> > parallelism is set to 8
>>>>> > Kryo serializer is used
>>>>> > Number of retainedJobs and retainedStages has been increased to
>>>>> check them
>>>>> > in the UI.
>>>>> >
>>>>> > What information regarding Spark Context would be of interest here?
>>>>> >
>>>>> > Regards,
>>>>> > Raghava.
>>>>> >
>>>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <an...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> >> If the data file is same then it should have similar distribution of
>>>>> >> keys.
>>>>> >> Few queries-
>>>>> >>
>>>>> >> 1. Did you compare the number of partitions in both the cases?
>>>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>>>> >> Program being submitted?
>>>>> >>
>>>>> >> Also, can you please share the details of Spark Context,
>>>>> Environment and
>>>>> >> Executors when you run via Scala program?
>>>>> >>
>>>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>>>> >> m.vijayaraghava@gmail.com> wrote:
>>>>> >>
>>>>> >>> Hello All,
>>>>> >>>
>>>>> >>> We are using HashPartitioner in the following way on a 3 node
>>>>> cluster (1
>>>>> >>> master and 2 worker nodes).
>>>>> >>>
>>>>> >>> val u =
>>>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>>>> (y.toInt,
>>>>> >>> x.toInt) } }).partitionBy(new
>>>>> HashPartitioner(8)).setName("u").persist()
>>>>> >>>
>>>>> >>> u.count()
>>>>> >>>
>>>>> >>> If we run this from the spark shell, the data (52 MB) is split
>>>>> across
>>>>> >>> the
>>>>> >>> two worker nodes. But if we put this in a scala program and run
>>>>> it, then
>>>>> >>> all the data goes to only one node. We have run it multiple times,
>>>>> but
>>>>> >>> this
>>>>> >>> behavior does not change. This seems strange.
>>>>> >>>
>>>>> >>> Is there some problem with the way we use HashPartitioner?
>>>>> >>>
>>>>> >>> Thanks in advance.
>>>>> >>>
>>>>> >>> Regards,
>>>>> >>> Raghava.
>>>>> >>>
>>>>> >>
>>>>> >>
>>>>> >
>>>>> >
>>>>> > --
>>>>> > Regards,
>>>>> > Raghava
>>>>> > http://raghavam.github.io
>>>>> >
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Mike
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Raghava
>>>> http://raghavam.github.io
>>>>
>>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>


-- 
Regards,
Raghava
http://raghavam.github.io

Re: executor delay in Spark

Posted by Mike Hynes <91...@gmail.com>.
Could you change numPartitions to {16, 32, 64} and run your program for
each to see how many partitions are allocated to each worker? Let's see if
you experience an all-nothing imbalance that way; if so, my guess is that
something else is odd in your program logic or spark runtime environment,
but if not and your executors all receive at least *some* partitions, then
I still wouldn't rule out effects of scheduling delay. It's a simple test,
but it could give some insight.

Mike

his could still be a  scheduling  If only one has *all* partitions,  and
email me the log file? (If it's 10+ MB, just the first few thousand lines
are fine).
On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m....@gmail.com>
wrote:

> Mike, All,
>
> It turns out that the second time we encountered the uneven-partition
> issue is not due to spark-submit. It was resolved with the change in
> placement of count().
>
> Case-1:
>
> val numPartitions = 8
> // read uAxioms from HDFS, use hash partitioner on it and persist it
> // read type1Axioms from HDFS, use hash partitioner on it and persist it
> currDeltaURule1 = type1Axioms.join(uAxioms)
>                              .values
>                              .distinct(numPartitions)
>                              .partitionBy(hashPartitioner)
> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>
>  .persist(StorageLevel.MEMORY_AND_DISK)
>                                        .count()
>
> <more transformations and actions>
>
> currDeltaURule1 RDD results in all the data on one node (there are 2
> worker nodes). If we move count(), the uneven partition issue is resolved.
>
> Case-2:
>
> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>
>  .persist(StorageLevel.MEMORY_AND_DISK)
>                                        <count not run here>
>
> <some more transformations that don't affect currDeltaURule1 rdd>
>
> <rdd.count()> -- this rdd depends on currDeltaURule1 and it gets executed.
> This resolved the uneven partitioning issue.
>
> I don't see why the moving of an action to a later part in the code would
> affect the partitioning. Are there other factors at play here that affect
> the partitioning?
>
> (Inconsistent) uneven partitioning leads to one machine getting over
> burdened (memory and number of tasks). We see a clear improvement in
> runtime when the partitioning is even (happens when count is moved).
>
> Any pointers in figuring out this issue is much appreciated.
>
> Regards,
> Raghava.
>
>
>
>
> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91...@gmail.com> wrote:
>
>> Glad to hear that the problem was solvable! I have not seen delays of
>> this type for later stages in jobs run by spark-submit, but I do not think
>> it impossible if your stage has no lineage dependence on other RDDs.
>>
>> I'm CC'ing the dev list to report of other users observing load imbalance
>> caused by unusual initial task scheduling. I don't know of ways to avoid
>> this other than creating a dummy task to synchronize the executors, but
>> hopefully someone from there can suggest other possibilities.
>>
>> Mike
>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m....@gmail.com>
>> wrote:
>>
>>> Mike,
>>>
>>> It turns out the executor delay, as you mentioned, is the cause. After
>>> we introduced a dummy stage, partitioning was working fine. Does this delay
>>> happen during later stages as well? We noticed the same behavior
>>> (partitioning happens on spark-shell but not through spark-submit) at a
>>> later stage also.
>>>
>>> Apart from introducing a dummy stage or running it from spark-shell, is
>>> there any other option to fix this?
>>>
>>> Regards,
>>> Raghava.
>>>
>>>
>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91...@gmail.com> wrote:
>>>
>>>> When submitting a job with spark-submit, I've observed delays (up to
>>>> 1--2 seconds) for the executors to respond to the driver in order to
>>>> receive tasks in the first stage. The delay does not persist once the
>>>> executors have been synchronized.
>>>>
>>>> When the tasks are very short, as may be your case (relatively small
>>>> data and a simple map task like you have described), the 8 tasks in
>>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>>> the second executor won't have responded to the master before the
>>>> first 4 tasks on the first executor have completed.
>>>>
>>>> To see if this is the cause in your particular case, you could try the
>>>> following to confirm:
>>>>         1. Examine the starting times of the tasks alongside their
>>>> executor
>>>>         2. Make a "dummy" stage execute before your real stages to
>>>> synchronize the executors by creating and materializing any random RDD
>>>>         3. Make the tasks longer, i.e. with some silly computational
>>>> work.
>>>>
>>>> Mike
>>>>
>>>>
>>>> On 4/17/16, Raghava Mutharaju <m....@gmail.com> wrote:
>>>> > Yes its the same data.
>>>> >
>>>> > 1) The number of partitions are the same (8, which is an argument to
>>>> the
>>>> > HashPartitioner). In the first case, these partitions are spread
>>>> across
>>>> > both the worker nodes. In the second case, all the partitions are on
>>>> the
>>>> > same node.
>>>> > 2) What resources would be of interest here? Scala shell takes the
>>>> default
>>>> > parameters since we use "bin/spark-shell --master <master-URL>" to
>>>> run the
>>>> > scala-shell. For the scala program, we do set some configuration
>>>> options
>>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>>>> > serializer.
>>>> >
>>>> > We are running this on Azure D3-v2 machines which have 4 cores and
>>>> 14GB
>>>> > RAM.1 executor runs on each worker node. Following configuration
>>>> options
>>>> > are set for the scala program -- perhaps we should move it to the
>>>> spark
>>>> > config file.
>>>> >
>>>> > Driver memory and executor memory are set to 12GB
>>>> > parallelism is set to 8
>>>> > Kryo serializer is used
>>>> > Number of retainedJobs and retainedStages has been increased to check
>>>> them
>>>> > in the UI.
>>>> >
>>>> > What information regarding Spark Context would be of interest here?
>>>> >
>>>> > Regards,
>>>> > Raghava.
>>>> >
>>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <an...@gmail.com>
>>>> wrote:
>>>> >
>>>> >> If the data file is same then it should have similar distribution of
>>>> >> keys.
>>>> >> Few queries-
>>>> >>
>>>> >> 1. Did you compare the number of partitions in both the cases?
>>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>>> >> Program being submitted?
>>>> >>
>>>> >> Also, can you please share the details of Spark Context, Environment
>>>> and
>>>> >> Executors when you run via Scala program?
>>>> >>
>>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>>> >> m.vijayaraghava@gmail.com> wrote:
>>>> >>
>>>> >>> Hello All,
>>>> >>>
>>>> >>> We are using HashPartitioner in the following way on a 3 node
>>>> cluster (1
>>>> >>> master and 2 worker nodes).
>>>> >>>
>>>> >>> val u =
>>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>>> (y.toInt,
>>>> >>> x.toInt) } }).partitionBy(new
>>>> HashPartitioner(8)).setName("u").persist()
>>>> >>>
>>>> >>> u.count()
>>>> >>>
>>>> >>> If we run this from the spark shell, the data (52 MB) is split
>>>> across
>>>> >>> the
>>>> >>> two worker nodes. But if we put this in a scala program and run it,
>>>> then
>>>> >>> all the data goes to only one node. We have run it multiple times,
>>>> but
>>>> >>> this
>>>> >>> behavior does not change. This seems strange.
>>>> >>>
>>>> >>> Is there some problem with the way we use HashPartitioner?
>>>> >>>
>>>> >>> Thanks in advance.
>>>> >>>
>>>> >>> Regards,
>>>> >>> Raghava.
>>>> >>>
>>>> >>
>>>> >>
>>>> >
>>>> >
>>>> > --
>>>> > Regards,
>>>> > Raghava
>>>> > http://raghavam.github.io
>>>> >
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Mike
>>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>> Raghava
>>> http://raghavam.github.io
>>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>

Re: executor delay in Spark

Posted by Mike Hynes <91...@gmail.com>.
Could you change numPartitions to {16, 32, 64} and run your program for
each to see how many partitions are allocated to each worker? Let's see if
you experience an all-nothing imbalance that way; if so, my guess is that
something else is odd in your program logic or spark runtime environment,
but if not and your executors all receive at least *some* partitions, then
I still wouldn't rule out effects of scheduling delay. It's a simple test,
but it could give some insight.

Mike

his could still be a  scheduling  If only one has *all* partitions,  and
email me the log file? (If it's 10+ MB, just the first few thousand lines
are fine).
On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m....@gmail.com>
wrote:

> Mike, All,
>
> It turns out that the second time we encountered the uneven-partition
> issue is not due to spark-submit. It was resolved with the change in
> placement of count().
>
> Case-1:
>
> val numPartitions = 8
> // read uAxioms from HDFS, use hash partitioner on it and persist it
> // read type1Axioms from HDFS, use hash partitioner on it and persist it
> currDeltaURule1 = type1Axioms.join(uAxioms)
>                              .values
>                              .distinct(numPartitions)
>                              .partitionBy(hashPartitioner)
> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>
>  .persist(StorageLevel.MEMORY_AND_DISK)
>                                        .count()
>
> <more transformations and actions>
>
> currDeltaURule1 RDD results in all the data on one node (there are 2
> worker nodes). If we move count(), the uneven partition issue is resolved.
>
> Case-2:
>
> currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)
>
>  .persist(StorageLevel.MEMORY_AND_DISK)
>                                        <count not run here>
>
> <some more transformations that don't affect currDeltaURule1 rdd>
>
> <rdd.count()> -- this rdd depends on currDeltaURule1 and it gets executed.
> This resolved the uneven partitioning issue.
>
> I don't see why the moving of an action to a later part in the code would
> affect the partitioning. Are there other factors at play here that affect
> the partitioning?
>
> (Inconsistent) uneven partitioning leads to one machine getting over
> burdened (memory and number of tasks). We see a clear improvement in
> runtime when the partitioning is even (happens when count is moved).
>
> Any pointers in figuring out this issue is much appreciated.
>
> Regards,
> Raghava.
>
>
>
>
> On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91...@gmail.com> wrote:
>
>> Glad to hear that the problem was solvable! I have not seen delays of
>> this type for later stages in jobs run by spark-submit, but I do not think
>> it impossible if your stage has no lineage dependence on other RDDs.
>>
>> I'm CC'ing the dev list to report of other users observing load imbalance
>> caused by unusual initial task scheduling. I don't know of ways to avoid
>> this other than creating a dummy task to synchronize the executors, but
>> hopefully someone from there can suggest other possibilities.
>>
>> Mike
>> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m....@gmail.com>
>> wrote:
>>
>>> Mike,
>>>
>>> It turns out the executor delay, as you mentioned, is the cause. After
>>> we introduced a dummy stage, partitioning was working fine. Does this delay
>>> happen during later stages as well? We noticed the same behavior
>>> (partitioning happens on spark-shell but not through spark-submit) at a
>>> later stage also.
>>>
>>> Apart from introducing a dummy stage or running it from spark-shell, is
>>> there any other option to fix this?
>>>
>>> Regards,
>>> Raghava.
>>>
>>>
>>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91...@gmail.com> wrote:
>>>
>>>> When submitting a job with spark-submit, I've observed delays (up to
>>>> 1--2 seconds) for the executors to respond to the driver in order to
>>>> receive tasks in the first stage. The delay does not persist once the
>>>> executors have been synchronized.
>>>>
>>>> When the tasks are very short, as may be your case (relatively small
>>>> data and a simple map task like you have described), the 8 tasks in
>>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>>> the second executor won't have responded to the master before the
>>>> first 4 tasks on the first executor have completed.
>>>>
>>>> To see if this is the cause in your particular case, you could try the
>>>> following to confirm:
>>>>         1. Examine the starting times of the tasks alongside their
>>>> executor
>>>>         2. Make a "dummy" stage execute before your real stages to
>>>> synchronize the executors by creating and materializing any random RDD
>>>>         3. Make the tasks longer, i.e. with some silly computational
>>>> work.
>>>>
>>>> Mike
>>>>
>>>>
>>>> On 4/17/16, Raghava Mutharaju <m....@gmail.com> wrote:
>>>> > Yes its the same data.
>>>> >
>>>> > 1) The number of partitions are the same (8, which is an argument to
>>>> the
>>>> > HashPartitioner). In the first case, these partitions are spread
>>>> across
>>>> > both the worker nodes. In the second case, all the partitions are on
>>>> the
>>>> > same node.
>>>> > 2) What resources would be of interest here? Scala shell takes the
>>>> default
>>>> > parameters since we use "bin/spark-shell --master <master-URL>" to
>>>> run the
>>>> > scala-shell. For the scala program, we do set some configuration
>>>> options
>>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>>>> > serializer.
>>>> >
>>>> > We are running this on Azure D3-v2 machines which have 4 cores and
>>>> 14GB
>>>> > RAM.1 executor runs on each worker node. Following configuration
>>>> options
>>>> > are set for the scala program -- perhaps we should move it to the
>>>> spark
>>>> > config file.
>>>> >
>>>> > Driver memory and executor memory are set to 12GB
>>>> > parallelism is set to 8
>>>> > Kryo serializer is used
>>>> > Number of retainedJobs and retainedStages has been increased to check
>>>> them
>>>> > in the UI.
>>>> >
>>>> > What information regarding Spark Context would be of interest here?
>>>> >
>>>> > Regards,
>>>> > Raghava.
>>>> >
>>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <an...@gmail.com>
>>>> wrote:
>>>> >
>>>> >> If the data file is same then it should have similar distribution of
>>>> >> keys.
>>>> >> Few queries-
>>>> >>
>>>> >> 1. Did you compare the number of partitions in both the cases?
>>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>>> >> Program being submitted?
>>>> >>
>>>> >> Also, can you please share the details of Spark Context, Environment
>>>> and
>>>> >> Executors when you run via Scala program?
>>>> >>
>>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>>> >> m.vijayaraghava@gmail.com> wrote:
>>>> >>
>>>> >>> Hello All,
>>>> >>>
>>>> >>> We are using HashPartitioner in the following way on a 3 node
>>>> cluster (1
>>>> >>> master and 2 worker nodes).
>>>> >>>
>>>> >>> val u =
>>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>>> (y.toInt,
>>>> >>> x.toInt) } }).partitionBy(new
>>>> HashPartitioner(8)).setName("u").persist()
>>>> >>>
>>>> >>> u.count()
>>>> >>>
>>>> >>> If we run this from the spark shell, the data (52 MB) is split
>>>> across
>>>> >>> the
>>>> >>> two worker nodes. But if we put this in a scala program and run it,
>>>> then
>>>> >>> all the data goes to only one node. We have run it multiple times,
>>>> but
>>>> >>> this
>>>> >>> behavior does not change. This seems strange.
>>>> >>>
>>>> >>> Is there some problem with the way we use HashPartitioner?
>>>> >>>
>>>> >>> Thanks in advance.
>>>> >>>
>>>> >>> Regards,
>>>> >>> Raghava.
>>>> >>>
>>>> >>
>>>> >>
>>>> >
>>>> >
>>>> > --
>>>> > Regards,
>>>> > Raghava
>>>> > http://raghavam.github.io
>>>> >
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Mike
>>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>> Raghava
>>> http://raghavam.github.io
>>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>

Re: executor delay in Spark

Posted by Raghava Mutharaju <m....@gmail.com>.
Mike, All,

It turns out that the second time we encountered the uneven-partition issue
is not due to spark-submit. It was resolved with the change in placement of
count().

Case-1:

val numPartitions = 8
// read uAxioms from HDFS, use hash partitioner on it and persist it
// read type1Axioms from HDFS, use hash partitioner on it and persist it
currDeltaURule1 = type1Axioms.join(uAxioms)
                             .values
                             .distinct(numPartitions)
                             .partitionBy(hashPartitioner)
currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)

 .persist(StorageLevel.MEMORY_AND_DISK)
                                       .count()

<more transformations and actions>

currDeltaURule1 RDD results in all the data on one node (there are 2 worker
nodes). If we move count(), the uneven partition issue is resolved.

Case-2:

currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)

 .persist(StorageLevel.MEMORY_AND_DISK)
                                       <count not run here>

<some more transformations that don't affect currDeltaURule1 rdd>

<rdd.count()> -- this rdd depends on currDeltaURule1 and it gets executed.
This resolved the uneven partitioning issue.

I don't see why the moving of an action to a later part in the code would
affect the partitioning. Are there other factors at play here that affect
the partitioning?

(Inconsistent) uneven partitioning leads to one machine getting over
burdened (memory and number of tasks). We see a clear improvement in
runtime when the partitioning is even (happens when count is moved).

Any pointers in figuring out this issue is much appreciated.

Regards,
Raghava.




On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91...@gmail.com> wrote:

> Glad to hear that the problem was solvable! I have not seen delays of this
> type for later stages in jobs run by spark-submit, but I do not think it
> impossible if your stage has no lineage dependence on other RDDs.
>
> I'm CC'ing the dev list to report of other users observing load imbalance
> caused by unusual initial task scheduling. I don't know of ways to avoid
> this other than creating a dummy task to synchronize the executors, but
> hopefully someone from there can suggest other possibilities.
>
> Mike
> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m....@gmail.com>
> wrote:
>
>> Mike,
>>
>> It turns out the executor delay, as you mentioned, is the cause. After we
>> introduced a dummy stage, partitioning was working fine. Does this delay
>> happen during later stages as well? We noticed the same behavior
>> (partitioning happens on spark-shell but not through spark-submit) at a
>> later stage also.
>>
>> Apart from introducing a dummy stage or running it from spark-shell, is
>> there any other option to fix this?
>>
>> Regards,
>> Raghava.
>>
>>
>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91...@gmail.com> wrote:
>>
>>> When submitting a job with spark-submit, I've observed delays (up to
>>> 1--2 seconds) for the executors to respond to the driver in order to
>>> receive tasks in the first stage. The delay does not persist once the
>>> executors have been synchronized.
>>>
>>> When the tasks are very short, as may be your case (relatively small
>>> data and a simple map task like you have described), the 8 tasks in
>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>> the second executor won't have responded to the master before the
>>> first 4 tasks on the first executor have completed.
>>>
>>> To see if this is the cause in your particular case, you could try the
>>> following to confirm:
>>>         1. Examine the starting times of the tasks alongside their
>>> executor
>>>         2. Make a "dummy" stage execute before your real stages to
>>> synchronize the executors by creating and materializing any random RDD
>>>         3. Make the tasks longer, i.e. with some silly computational
>>> work.
>>>
>>> Mike
>>>
>>>
>>> On 4/17/16, Raghava Mutharaju <m....@gmail.com> wrote:
>>> > Yes its the same data.
>>> >
>>> > 1) The number of partitions are the same (8, which is an argument to
>>> the
>>> > HashPartitioner). In the first case, these partitions are spread across
>>> > both the worker nodes. In the second case, all the partitions are on
>>> the
>>> > same node.
>>> > 2) What resources would be of interest here? Scala shell takes the
>>> default
>>> > parameters since we use "bin/spark-shell --master <master-URL>" to run
>>> the
>>> > scala-shell. For the scala program, we do set some configuration
>>> options
>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>>> > serializer.
>>> >
>>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>>> > RAM.1 executor runs on each worker node. Following configuration
>>> options
>>> > are set for the scala program -- perhaps we should move it to the spark
>>> > config file.
>>> >
>>> > Driver memory and executor memory are set to 12GB
>>> > parallelism is set to 8
>>> > Kryo serializer is used
>>> > Number of retainedJobs and retainedStages has been increased to check
>>> them
>>> > in the UI.
>>> >
>>> > What information regarding Spark Context would be of interest here?
>>> >
>>> > Regards,
>>> > Raghava.
>>> >
>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <an...@gmail.com>
>>> wrote:
>>> >
>>> >> If the data file is same then it should have similar distribution of
>>> >> keys.
>>> >> Few queries-
>>> >>
>>> >> 1. Did you compare the number of partitions in both the cases?
>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>> >> Program being submitted?
>>> >>
>>> >> Also, can you please share the details of Spark Context, Environment
>>> and
>>> >> Executors when you run via Scala program?
>>> >>
>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>> >> m.vijayaraghava@gmail.com> wrote:
>>> >>
>>> >>> Hello All,
>>> >>>
>>> >>> We are using HashPartitioner in the following way on a 3 node
>>> cluster (1
>>> >>> master and 2 worker nodes).
>>> >>>
>>> >>> val u =
>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>> (y.toInt,
>>> >>> x.toInt) } }).partitionBy(new
>>> HashPartitioner(8)).setName("u").persist()
>>> >>>
>>> >>> u.count()
>>> >>>
>>> >>> If we run this from the spark shell, the data (52 MB) is split across
>>> >>> the
>>> >>> two worker nodes. But if we put this in a scala program and run it,
>>> then
>>> >>> all the data goes to only one node. We have run it multiple times,
>>> but
>>> >>> this
>>> >>> behavior does not change. This seems strange.
>>> >>>
>>> >>> Is there some problem with the way we use HashPartitioner?
>>> >>>
>>> >>> Thanks in advance.
>>> >>>
>>> >>> Regards,
>>> >>> Raghava.
>>> >>>
>>> >>
>>> >>
>>> >
>>> >
>>> > --
>>> > Regards,
>>> > Raghava
>>> > http://raghavam.github.io
>>> >
>>>
>>>
>>> --
>>> Thanks,
>>> Mike
>>>
>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>


-- 
Regards,
Raghava
http://raghavam.github.io