You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ori Popowski <or...@gmail.com> on 2022/05/24 09:38:50 UTC

Job migrated from EMR to Dataproc takes 20 hours instead of 90 minutes

Hello

I migrated a job from EMR with Spark 2.4.4 to Dataproc with Spark 2.4.8. I
am creating a cluster with the exact same configuration, where the only
difference is that the original cluster uses 78 workers with 96 CPUs and
768GiB memory each, and in the new cluster I am using 117 machines with 64
CPUs and 512GiB each, to achieve the same amount of resources in the
cluster.

The job is run with the same configuration (num of partitions, parallelism,
etc.) and reads the same data. However, something strange happens and the
job takes 20 hours. What I observed is that there is a stage where the
driver instantiates a single task, and this task never starts because the
shuffle of moving all the data to it takes forever.

I also compared the runtime configuration and found some minor differences
(due to Dataproc being different from EMR) but I haven't found any
substantial difference.

In other stages the cluster utilizes all the partitions (400), and it's not
clear to me why it decides to invoke a single task.

Can anyone provide an insight as to why such a thing would happen?

Thanks

Re: Job migrated from EMR to Dataproc takes 20 hours instead of 90 minutes

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,
just to elaborate what Ranadip has pointed out here correctly, gzip files
are read only by one executor, where as a bzip file can be read by multiple
executors therefore their reading speed will be parallelised and higher.

try to use bzip2 for kafka connect.

Regards,
Gourav Sengupta

On Mon, May 30, 2022 at 10:06 PM Ranadip Chatterjee <ra...@gmail.com>
wrote:

> Gzip files are not splittable. Hence using very large (i.e. non
> partitioned) gzip files lead to contention at reading the files as readers
> cannot scale beyond the number of gzip files to read.
>
> Better to use a splittable compression format instead to allow frameworks
> to scale up. Or manually manage scaling by using partitions, as you are
> doing now.
>
> On Mon, 30 May 2022, 08:54 Ori Popowski, <or...@gmail.com> wrote:
>
>> Thanks.
>>
>> Eventually the problem was solved. I am still not 100% sure what caused
>> it but when I said the input was identical I simplified a bit because it
>> was not (sorry for misleading, I thought this information would just be
>> noise). Explanation: the input to the EMR job was gzips created by Firehose
>> and partitioned hourly. The input to the Dataproc job is gzips created by
>> Kafka Connect and is not partitioned hourly. Otherwise the *content* itself
>> is identical.
>>
>> When we started partitioning the files hourly the problem went away
>> completely.
>>
>> I am still not sure what's going on exactly. If someone has some insight
>> it's welcome.
>>
>> Thanks!
>>
>> On Fri, May 27, 2022 at 9:45 PM Aniket Mokashi <an...@gmail.com>
>> wrote:
>>
>>> +cloud-dataproc-discuss
>>>
>>> On Wed, May 25, 2022 at 12:33 AM Ranadip Chatterjee <ra...@gmail.com>
>>> wrote:
>>>
>>>> To me, it seems like the data being processed on the 2 systems is not
>>>> identical. Can't think of any other reason why the single task stage will
>>>> get a different number of input records in the 2 cases. 700gb of input to a
>>>> single task is not good, and seems to be the bottleneck.
>>>>
>>>> On Wed, 25 May 2022, 06:32 Ori Popowski, <or...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Both jobs use spark.dynamicAllocation.enabled so there's no need to
>>>>> change the number of executors. There are 702 executors in the Dataproc
>>>>> cluster so this is not the problem.
>>>>> About number of partitions - this I didn't change and it's still 400.
>>>>> While writing this now, I am realising that I have more partitions than
>>>>> executors, but the same situation applies to EMR.
>>>>>
>>>>> I am observing 1 task in the final stage also on EMR. The difference
>>>>> is that on EMR that task receives 50K volume of data and on Dataproc it
>>>>> receives 700gb. I don't understand why it's happening. It can mean that the
>>>>> graph is different. But the job is exactly the same. Could it be because
>>>>> the minor version of Spark is different?
>>>>>
>>>>> On Wed, May 25, 2022 at 12:27 AM Ranadip Chatterjee <
>>>>> ranadip.c@gmail.com> wrote:
>>>>>
>>>>>> Hi Ori,
>>>>>>
>>>>>> A single task for the final step can result from various scenarios
>>>>>> like an aggregate operation that results in only 1 value (e.g count) or a
>>>>>> key based aggregate with only 1 key for example. There could be other
>>>>>> scenarios as well. However, that would be the case in both EMR and Dataproc
>>>>>> if the same code is run on the same data in both cases.
>>>>>>
>>>>>> On a separate note, since you have now changed the size and number of
>>>>>> nodes, you may need to re-optimize the number and size of executors for the
>>>>>> job and perhaps the number of partitions as well to optimally use the
>>>>>> cluster resources.
>>>>>>
>>>>>> Regards,
>>>>>> Ranadip
>>>>>>
>>>>>> On Tue, 24 May 2022, 10:45 Ori Popowski, <or...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello
>>>>>>>
>>>>>>> I migrated a job from EMR with Spark 2.4.4 to Dataproc with Spark
>>>>>>> 2.4.8. I am creating a cluster with the exact same configuration, where the
>>>>>>> only difference is that the original cluster uses 78 workers with 96 CPUs
>>>>>>> and 768GiB memory each, and in the new cluster I am using 117 machines with
>>>>>>> 64 CPUs and 512GiB each, to achieve the same amount of resources in the
>>>>>>> cluster.
>>>>>>>
>>>>>>> The job is run with the same configuration (num of partitions,
>>>>>>> parallelism, etc.) and reads the same data. However, something strange
>>>>>>> happens and the job takes 20 hours. What I observed is that there is a
>>>>>>> stage where the driver instantiates a single task, and this task never
>>>>>>> starts because the shuffle of moving all the data to it takes forever.
>>>>>>>
>>>>>>> I also compared the runtime configuration and found some minor
>>>>>>> differences (due to Dataproc being different from EMR) but I haven't found
>>>>>>> any substantial difference.
>>>>>>>
>>>>>>> In other stages the cluster utilizes all the partitions (400), and
>>>>>>> it's not clear to me why it decides to invoke a single task.
>>>>>>>
>>>>>>> Can anyone provide an insight as to why such a thing would happen?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>
>>>>>>
>>>
>>> --
>>> "...:::Aniket:::... Quetzalco@tl"
>>>
>>

Re: Job migrated from EMR to Dataproc takes 20 hours instead of 90 minutes

Posted by Ranadip Chatterjee <ra...@gmail.com>.
Gzip files are not splittable. Hence using very large (i.e. non
partitioned) gzip files lead to contention at reading the files as readers
cannot scale beyond the number of gzip files to read.

Better to use a splittable compression format instead to allow frameworks
to scale up. Or manually manage scaling by using partitions, as you are
doing now.

On Mon, 30 May 2022, 08:54 Ori Popowski, <or...@gmail.com> wrote:

> Thanks.
>
> Eventually the problem was solved. I am still not 100% sure what caused it
> but when I said the input was identical I simplified a bit because it was
> not (sorry for misleading, I thought this information would just be noise).
> Explanation: the input to the EMR job was gzips created by Firehose and
> partitioned hourly. The input to the Dataproc job is gzips created by Kafka
> Connect and is not partitioned hourly. Otherwise the *content* itself is
> identical.
>
> When we started partitioning the files hourly the problem went away
> completely.
>
> I am still not sure what's going on exactly. If someone has some insight
> it's welcome.
>
> Thanks!
>
> On Fri, May 27, 2022 at 9:45 PM Aniket Mokashi <an...@gmail.com>
> wrote:
>
>> +cloud-dataproc-discuss
>>
>> On Wed, May 25, 2022 at 12:33 AM Ranadip Chatterjee <ra...@gmail.com>
>> wrote:
>>
>>> To me, it seems like the data being processed on the 2 systems is not
>>> identical. Can't think of any other reason why the single task stage will
>>> get a different number of input records in the 2 cases. 700gb of input to a
>>> single task is not good, and seems to be the bottleneck.
>>>
>>> On Wed, 25 May 2022, 06:32 Ori Popowski, <or...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Both jobs use spark.dynamicAllocation.enabled so there's no need to
>>>> change the number of executors. There are 702 executors in the Dataproc
>>>> cluster so this is not the problem.
>>>> About number of partitions - this I didn't change and it's still 400.
>>>> While writing this now, I am realising that I have more partitions than
>>>> executors, but the same situation applies to EMR.
>>>>
>>>> I am observing 1 task in the final stage also on EMR. The difference is
>>>> that on EMR that task receives 50K volume of data and on Dataproc it
>>>> receives 700gb. I don't understand why it's happening. It can mean that the
>>>> graph is different. But the job is exactly the same. Could it be because
>>>> the minor version of Spark is different?
>>>>
>>>> On Wed, May 25, 2022 at 12:27 AM Ranadip Chatterjee <
>>>> ranadip.c@gmail.com> wrote:
>>>>
>>>>> Hi Ori,
>>>>>
>>>>> A single task for the final step can result from various scenarios
>>>>> like an aggregate operation that results in only 1 value (e.g count) or a
>>>>> key based aggregate with only 1 key for example. There could be other
>>>>> scenarios as well. However, that would be the case in both EMR and Dataproc
>>>>> if the same code is run on the same data in both cases.
>>>>>
>>>>> On a separate note, since you have now changed the size and number of
>>>>> nodes, you may need to re-optimize the number and size of executors for the
>>>>> job and perhaps the number of partitions as well to optimally use the
>>>>> cluster resources.
>>>>>
>>>>> Regards,
>>>>> Ranadip
>>>>>
>>>>> On Tue, 24 May 2022, 10:45 Ori Popowski, <or...@gmail.com> wrote:
>>>>>
>>>>>> Hello
>>>>>>
>>>>>> I migrated a job from EMR with Spark 2.4.4 to Dataproc with Spark
>>>>>> 2.4.8. I am creating a cluster with the exact same configuration, where the
>>>>>> only difference is that the original cluster uses 78 workers with 96 CPUs
>>>>>> and 768GiB memory each, and in the new cluster I am using 117 machines with
>>>>>> 64 CPUs and 512GiB each, to achieve the same amount of resources in the
>>>>>> cluster.
>>>>>>
>>>>>> The job is run with the same configuration (num of partitions,
>>>>>> parallelism, etc.) and reads the same data. However, something strange
>>>>>> happens and the job takes 20 hours. What I observed is that there is a
>>>>>> stage where the driver instantiates a single task, and this task never
>>>>>> starts because the shuffle of moving all the data to it takes forever.
>>>>>>
>>>>>> I also compared the runtime configuration and found some minor
>>>>>> differences (due to Dataproc being different from EMR) but I haven't found
>>>>>> any substantial difference.
>>>>>>
>>>>>> In other stages the cluster utilizes all the partitions (400), and
>>>>>> it's not clear to me why it decides to invoke a single task.
>>>>>>
>>>>>> Can anyone provide an insight as to why such a thing would happen?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>
>>>>>
>>
>> --
>> "...:::Aniket:::... Quetzalco@tl"
>>
>

Re: Job migrated from EMR to Dataproc takes 20 hours instead of 90 minutes

Posted by Ori Popowski <or...@gmail.com>.
Thanks.

Eventually the problem was solved. I am still not 100% sure what caused it
but when I said the input was identical I simplified a bit because it was
not (sorry for misleading, I thought this information would just be noise).
Explanation: the input to the EMR job was gzips created by Firehose and
partitioned hourly. The input to the Dataproc job is gzips created by Kafka
Connect and is not partitioned hourly. Otherwise the *content* itself is
identical.

When we started partitioning the files hourly the problem went away
completely.

I am still not sure what's going on exactly. If someone has some insight
it's welcome.

Thanks!

On Fri, May 27, 2022 at 9:45 PM Aniket Mokashi <an...@gmail.com> wrote:

> +cloud-dataproc-discuss
>
> On Wed, May 25, 2022 at 12:33 AM Ranadip Chatterjee <ra...@gmail.com>
> wrote:
>
>> To me, it seems like the data being processed on the 2 systems is not
>> identical. Can't think of any other reason why the single task stage will
>> get a different number of input records in the 2 cases. 700gb of input to a
>> single task is not good, and seems to be the bottleneck.
>>
>> On Wed, 25 May 2022, 06:32 Ori Popowski, <or...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Both jobs use spark.dynamicAllocation.enabled so there's no need to
>>> change the number of executors. There are 702 executors in the Dataproc
>>> cluster so this is not the problem.
>>> About number of partitions - this I didn't change and it's still 400.
>>> While writing this now, I am realising that I have more partitions than
>>> executors, but the same situation applies to EMR.
>>>
>>> I am observing 1 task in the final stage also on EMR. The difference is
>>> that on EMR that task receives 50K volume of data and on Dataproc it
>>> receives 700gb. I don't understand why it's happening. It can mean that the
>>> graph is different. But the job is exactly the same. Could it be because
>>> the minor version of Spark is different?
>>>
>>> On Wed, May 25, 2022 at 12:27 AM Ranadip Chatterjee <ra...@gmail.com>
>>> wrote:
>>>
>>>> Hi Ori,
>>>>
>>>> A single task for the final step can result from various scenarios like
>>>> an aggregate operation that results in only 1 value (e.g count) or a key
>>>> based aggregate with only 1 key for example. There could be other scenarios
>>>> as well. However, that would be the case in both EMR and Dataproc if the
>>>> same code is run on the same data in both cases.
>>>>
>>>> On a separate note, since you have now changed the size and number of
>>>> nodes, you may need to re-optimize the number and size of executors for the
>>>> job and perhaps the number of partitions as well to optimally use the
>>>> cluster resources.
>>>>
>>>> Regards,
>>>> Ranadip
>>>>
>>>> On Tue, 24 May 2022, 10:45 Ori Popowski, <or...@gmail.com> wrote:
>>>>
>>>>> Hello
>>>>>
>>>>> I migrated a job from EMR with Spark 2.4.4 to Dataproc with Spark
>>>>> 2.4.8. I am creating a cluster with the exact same configuration, where the
>>>>> only difference is that the original cluster uses 78 workers with 96 CPUs
>>>>> and 768GiB memory each, and in the new cluster I am using 117 machines with
>>>>> 64 CPUs and 512GiB each, to achieve the same amount of resources in the
>>>>> cluster.
>>>>>
>>>>> The job is run with the same configuration (num of partitions,
>>>>> parallelism, etc.) and reads the same data. However, something strange
>>>>> happens and the job takes 20 hours. What I observed is that there is a
>>>>> stage where the driver instantiates a single task, and this task never
>>>>> starts because the shuffle of moving all the data to it takes forever.
>>>>>
>>>>> I also compared the runtime configuration and found some minor
>>>>> differences (due to Dataproc being different from EMR) but I haven't found
>>>>> any substantial difference.
>>>>>
>>>>> In other stages the cluster utilizes all the partitions (400), and
>>>>> it's not clear to me why it decides to invoke a single task.
>>>>>
>>>>> Can anyone provide an insight as to why such a thing would happen?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>
>>>>
>
> --
> "...:::Aniket:::... Quetzalco@tl"
>

Re: Job migrated from EMR to Dataproc takes 20 hours instead of 90 minutes

Posted by Aniket Mokashi <an...@gmail.com>.
+cloud-dataproc-discuss

On Wed, May 25, 2022 at 12:33 AM Ranadip Chatterjee <ra...@gmail.com>
wrote:

> To me, it seems like the data being processed on the 2 systems is not
> identical. Can't think of any other reason why the single task stage will
> get a different number of input records in the 2 cases. 700gb of input to a
> single task is not good, and seems to be the bottleneck.
>
> On Wed, 25 May 2022, 06:32 Ori Popowski, <or...@gmail.com> wrote:
>
>> Hi,
>>
>> Both jobs use spark.dynamicAllocation.enabled so there's no need to
>> change the number of executors. There are 702 executors in the Dataproc
>> cluster so this is not the problem.
>> About number of partitions - this I didn't change and it's still 400.
>> While writing this now, I am realising that I have more partitions than
>> executors, but the same situation applies to EMR.
>>
>> I am observing 1 task in the final stage also on EMR. The difference is
>> that on EMR that task receives 50K volume of data and on Dataproc it
>> receives 700gb. I don't understand why it's happening. It can mean that the
>> graph is different. But the job is exactly the same. Could it be because
>> the minor version of Spark is different?
>>
>> On Wed, May 25, 2022 at 12:27 AM Ranadip Chatterjee <ra...@gmail.com>
>> wrote:
>>
>>> Hi Ori,
>>>
>>> A single task for the final step can result from various scenarios like
>>> an aggregate operation that results in only 1 value (e.g count) or a key
>>> based aggregate with only 1 key for example. There could be other scenarios
>>> as well. However, that would be the case in both EMR and Dataproc if the
>>> same code is run on the same data in both cases.
>>>
>>> On a separate note, since you have now changed the size and number of
>>> nodes, you may need to re-optimize the number and size of executors for the
>>> job and perhaps the number of partitions as well to optimally use the
>>> cluster resources.
>>>
>>> Regards,
>>> Ranadip
>>>
>>> On Tue, 24 May 2022, 10:45 Ori Popowski, <or...@gmail.com> wrote:
>>>
>>>> Hello
>>>>
>>>> I migrated a job from EMR with Spark 2.4.4 to Dataproc with Spark
>>>> 2.4.8. I am creating a cluster with the exact same configuration, where the
>>>> only difference is that the original cluster uses 78 workers with 96 CPUs
>>>> and 768GiB memory each, and in the new cluster I am using 117 machines with
>>>> 64 CPUs and 512GiB each, to achieve the same amount of resources in the
>>>> cluster.
>>>>
>>>> The job is run with the same configuration (num of partitions,
>>>> parallelism, etc.) and reads the same data. However, something strange
>>>> happens and the job takes 20 hours. What I observed is that there is a
>>>> stage where the driver instantiates a single task, and this task never
>>>> starts because the shuffle of moving all the data to it takes forever.
>>>>
>>>> I also compared the runtime configuration and found some minor
>>>> differences (due to Dataproc being different from EMR) but I haven't found
>>>> any substantial difference.
>>>>
>>>> In other stages the cluster utilizes all the partitions (400), and it's
>>>> not clear to me why it decides to invoke a single task.
>>>>
>>>> Can anyone provide an insight as to why such a thing would happen?
>>>>
>>>> Thanks
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>
>>>

-- 
"...:::Aniket:::... Quetzalco@tl"

Re: Job migrated from EMR to Dataproc takes 20 hours instead of 90 minutes

Posted by Ranadip Chatterjee <ra...@gmail.com>.
To me, it seems like the data being processed on the 2 systems is not
identical. Can't think of any other reason why the single task stage will
get a different number of input records in the 2 cases. 700gb of input to a
single task is not good, and seems to be the bottleneck.

On Wed, 25 May 2022, 06:32 Ori Popowski, <or...@gmail.com> wrote:

> Hi,
>
> Both jobs use spark.dynamicAllocation.enabled so there's no need to
> change the number of executors. There are 702 executors in the Dataproc
> cluster so this is not the problem.
> About number of partitions - this I didn't change and it's still 400.
> While writing this now, I am realising that I have more partitions than
> executors, but the same situation applies to EMR.
>
> I am observing 1 task in the final stage also on EMR. The difference is
> that on EMR that task receives 50K volume of data and on Dataproc it
> receives 700gb. I don't understand why it's happening. It can mean that the
> graph is different. But the job is exactly the same. Could it be because
> the minor version of Spark is different?
>
> On Wed, May 25, 2022 at 12:27 AM Ranadip Chatterjee <ra...@gmail.com>
> wrote:
>
>> Hi Ori,
>>
>> A single task for the final step can result from various scenarios like
>> an aggregate operation that results in only 1 value (e.g count) or a key
>> based aggregate with only 1 key for example. There could be other scenarios
>> as well. However, that would be the case in both EMR and Dataproc if the
>> same code is run on the same data in both cases.
>>
>> On a separate note, since you have now changed the size and number of
>> nodes, you may need to re-optimize the number and size of executors for the
>> job and perhaps the number of partitions as well to optimally use the
>> cluster resources.
>>
>> Regards,
>> Ranadip
>>
>> On Tue, 24 May 2022, 10:45 Ori Popowski, <or...@gmail.com> wrote:
>>
>>> Hello
>>>
>>> I migrated a job from EMR with Spark 2.4.4 to Dataproc with Spark 2.4.8.
>>> I am creating a cluster with the exact same configuration, where the only
>>> difference is that the original cluster uses 78 workers with 96 CPUs and
>>> 768GiB memory each, and in the new cluster I am using 117 machines with 64
>>> CPUs and 512GiB each, to achieve the same amount of resources in the
>>> cluster.
>>>
>>> The job is run with the same configuration (num of partitions,
>>> parallelism, etc.) and reads the same data. However, something strange
>>> happens and the job takes 20 hours. What I observed is that there is a
>>> stage where the driver instantiates a single task, and this task never
>>> starts because the shuffle of moving all the data to it takes forever.
>>>
>>> I also compared the runtime configuration and found some minor
>>> differences (due to Dataproc being different from EMR) but I haven't found
>>> any substantial difference.
>>>
>>> In other stages the cluster utilizes all the partitions (400), and it's
>>> not clear to me why it decides to invoke a single task.
>>>
>>> Can anyone provide an insight as to why such a thing would happen?
>>>
>>> Thanks
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>

Re: Job migrated from EMR to Dataproc takes 20 hours instead of 90 minutes

Posted by Ori Popowski <or...@gmail.com>.
Hi,

Both jobs use spark.dynamicAllocation.enabled so there's no need to change
the number of executors. There are 702 executors in the Dataproc cluster so
this is not the problem.
About number of partitions - this I didn't change and it's still 400. While
writing this now, I am realising that I have more partitions than
executors, but the same situation applies to EMR.

I am observing 1 task in the final stage also on EMR. The difference is
that on EMR that task receives 50K volume of data and on Dataproc it
receives 700gb. I don't understand why it's happening. It can mean that the
graph is different. But the job is exactly the same. Could it be because
the minor version of Spark is different?

On Wed, May 25, 2022 at 12:27 AM Ranadip Chatterjee <ra...@gmail.com>
wrote:

> Hi Ori,
>
> A single task for the final step can result from various scenarios like an
> aggregate operation that results in only 1 value (e.g count) or a key based
> aggregate with only 1 key for example. There could be other scenarios as
> well. However, that would be the case in both EMR and Dataproc if the same
> code is run on the same data in both cases.
>
> On a separate note, since you have now changed the size and number of
> nodes, you may need to re-optimize the number and size of executors for the
> job and perhaps the number of partitions as well to optimally use the
> cluster resources.
>
> Regards,
> Ranadip
>
> On Tue, 24 May 2022, 10:45 Ori Popowski, <or...@gmail.com> wrote:
>
>> Hello
>>
>> I migrated a job from EMR with Spark 2.4.4 to Dataproc with Spark 2.4.8.
>> I am creating a cluster with the exact same configuration, where the only
>> difference is that the original cluster uses 78 workers with 96 CPUs and
>> 768GiB memory each, and in the new cluster I am using 117 machines with 64
>> CPUs and 512GiB each, to achieve the same amount of resources in the
>> cluster.
>>
>> The job is run with the same configuration (num of partitions,
>> parallelism, etc.) and reads the same data. However, something strange
>> happens and the job takes 20 hours. What I observed is that there is a
>> stage where the driver instantiates a single task, and this task never
>> starts because the shuffle of moving all the data to it takes forever.
>>
>> I also compared the runtime configuration and found some minor
>> differences (due to Dataproc being different from EMR) but I haven't found
>> any substantial difference.
>>
>> In other stages the cluster utilizes all the partitions (400), and it's
>> not clear to me why it decides to invoke a single task.
>>
>> Can anyone provide an insight as to why such a thing would happen?
>>
>> Thanks
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Re: Job migrated from EMR to Dataproc takes 20 hours instead of 90 minutes

Posted by Ranadip Chatterjee <ra...@gmail.com>.
Hi Ori,

A single task for the final step can result from various scenarios like an
aggregate operation that results in only 1 value (e.g count) or a key based
aggregate with only 1 key for example. There could be other scenarios as
well. However, that would be the case in both EMR and Dataproc if the same
code is run on the same data in both cases.

On a separate note, since you have now changed the size and number of
nodes, you may need to re-optimize the number and size of executors for the
job and perhaps the number of partitions as well to optimally use the
cluster resources.

Regards,
Ranadip

On Tue, 24 May 2022, 10:45 Ori Popowski, <or...@gmail.com> wrote:

> Hello
>
> I migrated a job from EMR with Spark 2.4.4 to Dataproc with Spark 2.4.8. I
> am creating a cluster with the exact same configuration, where the only
> difference is that the original cluster uses 78 workers with 96 CPUs and
> 768GiB memory each, and in the new cluster I am using 117 machines with 64
> CPUs and 512GiB each, to achieve the same amount of resources in the
> cluster.
>
> The job is run with the same configuration (num of partitions,
> parallelism, etc.) and reads the same data. However, something strange
> happens and the job takes 20 hours. What I observed is that there is a
> stage where the driver instantiates a single task, and this task never
> starts because the shuffle of moving all the data to it takes forever.
>
> I also compared the runtime configuration and found some minor differences
> (due to Dataproc being different from EMR) but I haven't found any
> substantial difference.
>
> In other stages the cluster utilizes all the partitions (400), and it's
> not clear to me why it decides to invoke a single task.
>
> Can anyone provide an insight as to why such a thing would happen?
>
> Thanks
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org