You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by JF Chen <da...@gmail.com> on 2019/03/04 10:02:15 UTC

spark df.write.partitionBy run very slow

I am trying to write data in dataset to hdfs via df.write.partitionBy(column_a,
column_b, column_c).parquet(output_path)
However, it costs several minutes to write only hundreds of MB data to
hdfs.
From this article
<https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
adding repartition method before write should work. But if there is data
skew, some tasks may cost much longer time than average, which still cost
much time.
How to solve this problem? Thanks in advance !


Regard,
Junfeng Chen

Re: spark df.write.partitionBy run very slow

Posted by Shyam P <sh...@gmail.com>.
you can check the "Executors" tab in the spark UI screen...

On Fri, Mar 15, 2019 at 7:56 AM JF Chen <da...@gmail.com> wrote:

> But now I have another question, how to determine which data node the
> spark task is writing? It's really important for diving in the problem .
>
> Regard,
> Junfeng Chen
>
>
> On Thu, Mar 14, 2019 at 2:26 PM Shyam P <sh...@gmail.com> wrote:
>
>> cool.
>>
>> On Tue, Mar 12, 2019 at 9:08 AM JF Chen <da...@gmail.com> wrote:
>>
>>> Hi
>>> Finally I found the reason...
>>> It caused by some long time gc on some datanodes. After receiving the
>>> data from executors, the data node with long gc cannot report blocks to
>>> namenode, so the writing progress takes a long time.
>>> Now I have decommissioned the broken data nodes, and now my spark runs
>>> well.
>>> I am trying to increase the heap size of data node to check if it can
>>> resolve the problem
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>>
>>> On Fri, Mar 8, 2019 at 8:54 PM Shyam P <sh...@gmail.com> wrote:
>>>
>>>> Did you check this , how many portions and count of records it shoes ?
>>>>
>>>> //count by partition_id
>>>>         import org.apache.spark.sql.functions.spark_partition_id
>>>>         df.groupBy(spark_partition_id).count.show()
>>>>
>>>>
>>>>
>>>> Are you getting same number of parquet files ?
>>>>
>>>> You gradually increase the sample size.
>>>>
>>>> On Fri, 8 Mar 2019, 14:17 JF Chen, <da...@gmail.com> wrote:
>>>>
>>>>> I check my partitionBy method again, it's partitionBy(appname, year,
>>>>> month, day, hour), and the number of partitions of appname is much more
>>>>> than partition of year, month, day, and hour. My spark streaming app runs
>>>>> every 5 minutes, so year, month, day, and hour should be same in most of
>>>>> time.
>>>>> So will the number of appname pattition affect the writing efficiency?
>>>>>
>>>>> Regard,
>>>>> Junfeng Chen
>>>>>
>>>>>
>>>>> On Thu, Mar 7, 2019 at 4:21 PM JF Chen <da...@gmail.com> wrote:
>>>>>
>>>>>> Yes, I agree.
>>>>>>
>>>>>> From the spark UI I can ensure data is not skewed. There is only
>>>>>> about 100MB for each task, where most of tasks takes several seconds to
>>>>>> write the data to hdfs, and some tasks takes minutes of time.
>>>>>>
>>>>>> Regard,
>>>>>> Junfeng Chen
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P <sh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi JF,
>>>>>>> Yes first we should know actual number of partitions dataframe has
>>>>>>> and its counts of records. Accordingly we should try to have data evenly in
>>>>>>> all partitions.
>>>>>>> It always better to have Num of paritions = N * Num of executors.
>>>>>>>
>>>>>>>
>>>>>>>   "But the sequence of columns in  partitionBy  decides the
>>>>>>> directory  hierarchy structure. I hope the sequence of columns not change"
>>>>>>> , this is correct.
>>>>>>> Hence sometimes we should go with bigger number first then lesser
>>>>>>> .... try this ..i.e. more parent directories and less child directories.
>>>>>>> Tweet around it and try.
>>>>>>>
>>>>>>> "some tasks in write hdfs stage cost much more time than others" may
>>>>>>> be data is skewed, need to  distrube them evenly for all partitions.
>>>>>>>
>>>>>>> ~Shyam
>>>>>>>
>>>>>>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen <da...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Shyam
>>>>>>>> Thanks for your reply.
>>>>>>>> You mean after knowing the partition number of column_a, column_b,
>>>>>>>> column_c, the sequence of column in partitionBy should be same to the order
>>>>>>>> of partitions number of column a, b and c?
>>>>>>>> But the sequence of columns in  partitionBy  decides the
>>>>>>>> directory  hierarchy structure. I hope the sequence of columns not change.
>>>>>>>>
>>>>>>>> And I found one more strange things, some tasks in write hdfs stage
>>>>>>>> cost much more time than others, where the amount of writing data is
>>>>>>>> similar. How to solve it?
>>>>>>>>
>>>>>>>> Regard,
>>>>>>>> Junfeng Chen
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <sh...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi JF ,
>>>>>>>>>  Try to execute it before df.write....
>>>>>>>>>
>>>>>>>>> //count by partition_id
>>>>>>>>>         import org.apache.spark.sql.functions.spark_partition_id
>>>>>>>>>         df.groupBy(spark_partition_id).count.show()
>>>>>>>>>
>>>>>>>>> You will come to know how data has been partitioned inside df.
>>>>>>>>>
>>>>>>>>> Small trick we can apply here while partitionBy(column_a,
>>>>>>>>> column_b, column_c)
>>>>>>>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>>>>>>>> partitions) >  ( column_c  partitions) .
>>>>>>>>>
>>>>>>>>> Try this.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Shyam
>>>>>>>>>
>>>>>>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <da...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I am trying to write data in dataset to hdfs via df.write.
>>>>>>>>>> partitionBy(column_a, column_b, column_c).parquet(output_path)
>>>>>>>>>> However, it costs several minutes to write only hundreds of MB
>>>>>>>>>> data to hdfs.
>>>>>>>>>> From this article
>>>>>>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>>>>>>>>> adding repartition method before write should work. But if there
>>>>>>>>>> is data skew, some tasks may cost much longer time than average, which
>>>>>>>>>> still cost much time.
>>>>>>>>>> How to solve this problem? Thanks in advance !
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regard,
>>>>>>>>>> Junfeng Chen
>>>>>>>>>>
>>>>>>>>>

Re: spark df.write.partitionBy run very slow

Posted by JF Chen <da...@gmail.com>.
But now I have another question, how to determine which data node the spark
task is writing? It's really important for diving in the problem .

Regard,
Junfeng Chen


On Thu, Mar 14, 2019 at 2:26 PM Shyam P <sh...@gmail.com> wrote:

> cool.
>
> On Tue, Mar 12, 2019 at 9:08 AM JF Chen <da...@gmail.com> wrote:
>
>> Hi
>> Finally I found the reason...
>> It caused by some long time gc on some datanodes. After receiving the
>> data from executors, the data node with long gc cannot report blocks to
>> namenode, so the writing progress takes a long time.
>> Now I have decommissioned the broken data nodes, and now my spark runs
>> well.
>> I am trying to increase the heap size of data node to check if it can
>> resolve the problem
>>
>> Regard,
>> Junfeng Chen
>>
>>
>> On Fri, Mar 8, 2019 at 8:54 PM Shyam P <sh...@gmail.com> wrote:
>>
>>> Did you check this , how many portions and count of records it shoes ?
>>>
>>> //count by partition_id
>>>         import org.apache.spark.sql.functions.spark_partition_id
>>>         df.groupBy(spark_partition_id).count.show()
>>>
>>>
>>>
>>> Are you getting same number of parquet files ?
>>>
>>> You gradually increase the sample size.
>>>
>>> On Fri, 8 Mar 2019, 14:17 JF Chen, <da...@gmail.com> wrote:
>>>
>>>> I check my partitionBy method again, it's partitionBy(appname, year,
>>>> month, day, hour), and the number of partitions of appname is much more
>>>> than partition of year, month, day, and hour. My spark streaming app runs
>>>> every 5 minutes, so year, month, day, and hour should be same in most of
>>>> time.
>>>> So will the number of appname pattition affect the writing efficiency?
>>>>
>>>> Regard,
>>>> Junfeng Chen
>>>>
>>>>
>>>> On Thu, Mar 7, 2019 at 4:21 PM JF Chen <da...@gmail.com> wrote:
>>>>
>>>>> Yes, I agree.
>>>>>
>>>>> From the spark UI I can ensure data is not skewed. There is only about
>>>>> 100MB for each task, where most of tasks takes several seconds to write the
>>>>> data to hdfs, and some tasks takes minutes of time.
>>>>>
>>>>> Regard,
>>>>> Junfeng Chen
>>>>>
>>>>>
>>>>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P <sh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi JF,
>>>>>> Yes first we should know actual number of partitions dataframe has
>>>>>> and its counts of records. Accordingly we should try to have data evenly in
>>>>>> all partitions.
>>>>>> It always better to have Num of paritions = N * Num of executors.
>>>>>>
>>>>>>
>>>>>>   "But the sequence of columns in  partitionBy  decides the
>>>>>> directory  hierarchy structure. I hope the sequence of columns not change"
>>>>>> , this is correct.
>>>>>> Hence sometimes we should go with bigger number first then lesser
>>>>>> .... try this ..i.e. more parent directories and less child directories.
>>>>>> Tweet around it and try.
>>>>>>
>>>>>> "some tasks in write hdfs stage cost much more time than others" may
>>>>>> be data is skewed, need to  distrube them evenly for all partitions.
>>>>>>
>>>>>> ~Shyam
>>>>>>
>>>>>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen <da...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Shyam
>>>>>>> Thanks for your reply.
>>>>>>> You mean after knowing the partition number of column_a, column_b,
>>>>>>> column_c, the sequence of column in partitionBy should be same to the order
>>>>>>> of partitions number of column a, b and c?
>>>>>>> But the sequence of columns in  partitionBy  decides the
>>>>>>> directory  hierarchy structure. I hope the sequence of columns not change.
>>>>>>>
>>>>>>> And I found one more strange things, some tasks in write hdfs stage
>>>>>>> cost much more time than others, where the amount of writing data is
>>>>>>> similar. How to solve it?
>>>>>>>
>>>>>>> Regard,
>>>>>>> Junfeng Chen
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <sh...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi JF ,
>>>>>>>>  Try to execute it before df.write....
>>>>>>>>
>>>>>>>> //count by partition_id
>>>>>>>>         import org.apache.spark.sql.functions.spark_partition_id
>>>>>>>>         df.groupBy(spark_partition_id).count.show()
>>>>>>>>
>>>>>>>> You will come to know how data has been partitioned inside df.
>>>>>>>>
>>>>>>>> Small trick we can apply here while partitionBy(column_a,
>>>>>>>> column_b, column_c)
>>>>>>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>>>>>>> partitions) >  ( column_c  partitions) .
>>>>>>>>
>>>>>>>> Try this.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Shyam
>>>>>>>>
>>>>>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <da...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I am trying to write data in dataset to hdfs via df.write.
>>>>>>>>> partitionBy(column_a, column_b, column_c).parquet(output_path)
>>>>>>>>> However, it costs several minutes to write only hundreds of MB
>>>>>>>>> data to hdfs.
>>>>>>>>> From this article
>>>>>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>>>>>>>> adding repartition method before write should work. But if there
>>>>>>>>> is data skew, some tasks may cost much longer time than average, which
>>>>>>>>> still cost much time.
>>>>>>>>> How to solve this problem? Thanks in advance !
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regard,
>>>>>>>>> Junfeng Chen
>>>>>>>>>
>>>>>>>>

Re: spark df.write.partitionBy run very slow

Posted by Shyam P <sh...@gmail.com>.
cool.

On Tue, Mar 12, 2019 at 9:08 AM JF Chen <da...@gmail.com> wrote:

> Hi
> Finally I found the reason...
> It caused by some long time gc on some datanodes. After receiving the data
> from executors, the data node with long gc cannot report blocks to
> namenode, so the writing progress takes a long time.
> Now I have decommissioned the broken data nodes, and now my spark runs
> well.
> I am trying to increase the heap size of data node to check if it can
> resolve the problem
>
> Regard,
> Junfeng Chen
>
>
> On Fri, Mar 8, 2019 at 8:54 PM Shyam P <sh...@gmail.com> wrote:
>
>> Did you check this , how many portions and count of records it shoes ?
>>
>> //count by partition_id
>>         import org.apache.spark.sql.functions.spark_partition_id
>>         df.groupBy(spark_partition_id).count.show()
>>
>>
>>
>> Are you getting same number of parquet files ?
>>
>> You gradually increase the sample size.
>>
>> On Fri, 8 Mar 2019, 14:17 JF Chen, <da...@gmail.com> wrote:
>>
>>> I check my partitionBy method again, it's partitionBy(appname, year,
>>> month, day, hour), and the number of partitions of appname is much more
>>> than partition of year, month, day, and hour. My spark streaming app runs
>>> every 5 minutes, so year, month, day, and hour should be same in most of
>>> time.
>>> So will the number of appname pattition affect the writing efficiency?
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>>
>>> On Thu, Mar 7, 2019 at 4:21 PM JF Chen <da...@gmail.com> wrote:
>>>
>>>> Yes, I agree.
>>>>
>>>> From the spark UI I can ensure data is not skewed. There is only about
>>>> 100MB for each task, where most of tasks takes several seconds to write the
>>>> data to hdfs, and some tasks takes minutes of time.
>>>>
>>>> Regard,
>>>> Junfeng Chen
>>>>
>>>>
>>>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P <sh...@gmail.com> wrote:
>>>>
>>>>> Hi JF,
>>>>> Yes first we should know actual number of partitions dataframe has and
>>>>> its counts of records. Accordingly we should try to have data evenly in all
>>>>> partitions.
>>>>> It always better to have Num of paritions = N * Num of executors.
>>>>>
>>>>>
>>>>>   "But the sequence of columns in  partitionBy  decides the
>>>>> directory  hierarchy structure. I hope the sequence of columns not change"
>>>>> , this is correct.
>>>>> Hence sometimes we should go with bigger number first then lesser ....
>>>>> try this ..i.e. more parent directories and less child directories. Tweet
>>>>> around it and try.
>>>>>
>>>>> "some tasks in write hdfs stage cost much more time than others" may
>>>>> be data is skewed, need to  distrube them evenly for all partitions.
>>>>>
>>>>> ~Shyam
>>>>>
>>>>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen <da...@gmail.com> wrote:
>>>>>
>>>>>> Hi Shyam
>>>>>> Thanks for your reply.
>>>>>> You mean after knowing the partition number of column_a, column_b,
>>>>>> column_c, the sequence of column in partitionBy should be same to the order
>>>>>> of partitions number of column a, b and c?
>>>>>> But the sequence of columns in  partitionBy  decides the
>>>>>> directory  hierarchy structure. I hope the sequence of columns not change.
>>>>>>
>>>>>> And I found one more strange things, some tasks in write hdfs stage
>>>>>> cost much more time than others, where the amount of writing data is
>>>>>> similar. How to solve it?
>>>>>>
>>>>>> Regard,
>>>>>> Junfeng Chen
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <sh...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi JF ,
>>>>>>>  Try to execute it before df.write....
>>>>>>>
>>>>>>> //count by partition_id
>>>>>>>         import org.apache.spark.sql.functions.spark_partition_id
>>>>>>>         df.groupBy(spark_partition_id).count.show()
>>>>>>>
>>>>>>> You will come to know how data has been partitioned inside df.
>>>>>>>
>>>>>>> Small trick we can apply here while partitionBy(column_a, column_b,
>>>>>>> column_c)
>>>>>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>>>>>> partitions) >  ( column_c  partitions) .
>>>>>>>
>>>>>>> Try this.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Shyam
>>>>>>>
>>>>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <da...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I am trying to write data in dataset to hdfs via df.write.
>>>>>>>> partitionBy(column_a, column_b, column_c).parquet(output_path)
>>>>>>>> However, it costs several minutes to write only hundreds of MB data
>>>>>>>> to hdfs.
>>>>>>>> From this article
>>>>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>>>>>>> adding repartition method before write should work. But if there
>>>>>>>> is data skew, some tasks may cost much longer time than average, which
>>>>>>>> still cost much time.
>>>>>>>> How to solve this problem? Thanks in advance !
>>>>>>>>
>>>>>>>>
>>>>>>>> Regard,
>>>>>>>> Junfeng Chen
>>>>>>>>
>>>>>>>

Re: spark df.write.partitionBy run very slow

Posted by JF Chen <da...@gmail.com>.
Hi
Finally I found the reason...
It caused by some long time gc on some datanodes. After receiving the data
from executors, the data node with long gc cannot report blocks to
namenode, so the writing progress takes a long time.
Now I have decommissioned the broken data nodes, and now my spark runs
well.
I am trying to increase the heap size of data node to check if it can
resolve the problem

Regard,
Junfeng Chen


On Fri, Mar 8, 2019 at 8:54 PM Shyam P <sh...@gmail.com> wrote:

> Did you check this , how many portions and count of records it shoes ?
>
> //count by partition_id
>         import org.apache.spark.sql.functions.spark_partition_id
>         df.groupBy(spark_partition_id).count.show()
>
>
>
> Are you getting same number of parquet files ?
>
> You gradually increase the sample size.
>
> On Fri, 8 Mar 2019, 14:17 JF Chen, <da...@gmail.com> wrote:
>
>> I check my partitionBy method again, it's partitionBy(appname, year,
>> month, day, hour), and the number of partitions of appname is much more
>> than partition of year, month, day, and hour. My spark streaming app runs
>> every 5 minutes, so year, month, day, and hour should be same in most of
>> time.
>> So will the number of appname pattition affect the writing efficiency?
>>
>> Regard,
>> Junfeng Chen
>>
>>
>> On Thu, Mar 7, 2019 at 4:21 PM JF Chen <da...@gmail.com> wrote:
>>
>>> Yes, I agree.
>>>
>>> From the spark UI I can ensure data is not skewed. There is only about
>>> 100MB for each task, where most of tasks takes several seconds to write the
>>> data to hdfs, and some tasks takes minutes of time.
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>>
>>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P <sh...@gmail.com> wrote:
>>>
>>>> Hi JF,
>>>> Yes first we should know actual number of partitions dataframe has and
>>>> its counts of records. Accordingly we should try to have data evenly in all
>>>> partitions.
>>>> It always better to have Num of paritions = N * Num of executors.
>>>>
>>>>
>>>>   "But the sequence of columns in  partitionBy  decides the
>>>> directory  hierarchy structure. I hope the sequence of columns not change"
>>>> , this is correct.
>>>> Hence sometimes we should go with bigger number first then lesser ....
>>>> try this ..i.e. more parent directories and less child directories. Tweet
>>>> around it and try.
>>>>
>>>> "some tasks in write hdfs stage cost much more time than others" may be
>>>> data is skewed, need to  distrube them evenly for all partitions.
>>>>
>>>> ~Shyam
>>>>
>>>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen <da...@gmail.com> wrote:
>>>>
>>>>> Hi Shyam
>>>>> Thanks for your reply.
>>>>> You mean after knowing the partition number of column_a, column_b,
>>>>> column_c, the sequence of column in partitionBy should be same to the order
>>>>> of partitions number of column a, b and c?
>>>>> But the sequence of columns in  partitionBy  decides the
>>>>> directory  hierarchy structure. I hope the sequence of columns not change.
>>>>>
>>>>> And I found one more strange things, some tasks in write hdfs stage
>>>>> cost much more time than others, where the amount of writing data is
>>>>> similar. How to solve it?
>>>>>
>>>>> Regard,
>>>>> Junfeng Chen
>>>>>
>>>>>
>>>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <sh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi JF ,
>>>>>>  Try to execute it before df.write....
>>>>>>
>>>>>> //count by partition_id
>>>>>>         import org.apache.spark.sql.functions.spark_partition_id
>>>>>>         df.groupBy(spark_partition_id).count.show()
>>>>>>
>>>>>> You will come to know how data has been partitioned inside df.
>>>>>>
>>>>>> Small trick we can apply here while partitionBy(column_a, column_b,
>>>>>> column_c)
>>>>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>>>>> partitions) >  ( column_c  partitions) .
>>>>>>
>>>>>> Try this.
>>>>>>
>>>>>> Regards,
>>>>>> Shyam
>>>>>>
>>>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <da...@gmail.com> wrote:
>>>>>>
>>>>>>> I am trying to write data in dataset to hdfs via df.write.
>>>>>>> partitionBy(column_a, column_b, column_c).parquet(output_path)
>>>>>>> However, it costs several minutes to write only hundreds of MB data
>>>>>>> to hdfs.
>>>>>>> From this article
>>>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>>>>>> adding repartition method before write should work. But if there is
>>>>>>> data skew, some tasks may cost much longer time than average, which still
>>>>>>> cost much time.
>>>>>>> How to solve this problem? Thanks in advance !
>>>>>>>
>>>>>>>
>>>>>>> Regard,
>>>>>>> Junfeng Chen
>>>>>>>
>>>>>>

Re: spark df.write.partitionBy run very slow

Posted by Shyam P <sh...@gmail.com>.
Did you check this , how many portions and count of records it shoes ?

//count by partition_id
        import org.apache.spark.sql.functions.spark_partition_id
        df.groupBy(spark_partition_id).count.show()



Are you getting same number of parquet files ?

You gradually increase the sample size.

On Fri, 8 Mar 2019, 14:17 JF Chen, <da...@gmail.com> wrote:

> I check my partitionBy method again, it's partitionBy(appname, year,
> month, day, hour), and the number of partitions of appname is much more
> than partition of year, month, day, and hour. My spark streaming app runs
> every 5 minutes, so year, month, day, and hour should be same in most of
> time.
> So will the number of appname pattition affect the writing efficiency?
>
> Regard,
> Junfeng Chen
>
>
> On Thu, Mar 7, 2019 at 4:21 PM JF Chen <da...@gmail.com> wrote:
>
>> Yes, I agree.
>>
>> From the spark UI I can ensure data is not skewed. There is only about
>> 100MB for each task, where most of tasks takes several seconds to write the
>> data to hdfs, and some tasks takes minutes of time.
>>
>> Regard,
>> Junfeng Chen
>>
>>
>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P <sh...@gmail.com> wrote:
>>
>>> Hi JF,
>>> Yes first we should know actual number of partitions dataframe has and
>>> its counts of records. Accordingly we should try to have data evenly in all
>>> partitions.
>>> It always better to have Num of paritions = N * Num of executors.
>>>
>>>
>>>   "But the sequence of columns in  partitionBy  decides the
>>> directory  hierarchy structure. I hope the sequence of columns not change"
>>> , this is correct.
>>> Hence sometimes we should go with bigger number first then lesser ....
>>> try this ..i.e. more parent directories and less child directories. Tweet
>>> around it and try.
>>>
>>> "some tasks in write hdfs stage cost much more time than others" may be
>>> data is skewed, need to  distrube them evenly for all partitions.
>>>
>>> ~Shyam
>>>
>>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen <da...@gmail.com> wrote:
>>>
>>>> Hi Shyam
>>>> Thanks for your reply.
>>>> You mean after knowing the partition number of column_a, column_b,
>>>> column_c, the sequence of column in partitionBy should be same to the order
>>>> of partitions number of column a, b and c?
>>>> But the sequence of columns in  partitionBy  decides the
>>>> directory  hierarchy structure. I hope the sequence of columns not change.
>>>>
>>>> And I found one more strange things, some tasks in write hdfs stage
>>>> cost much more time than others, where the amount of writing data is
>>>> similar. How to solve it?
>>>>
>>>> Regard,
>>>> Junfeng Chen
>>>>
>>>>
>>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <sh...@gmail.com> wrote:
>>>>
>>>>> Hi JF ,
>>>>>  Try to execute it before df.write....
>>>>>
>>>>> //count by partition_id
>>>>>         import org.apache.spark.sql.functions.spark_partition_id
>>>>>         df.groupBy(spark_partition_id).count.show()
>>>>>
>>>>> You will come to know how data has been partitioned inside df.
>>>>>
>>>>> Small trick we can apply here while partitionBy(column_a, column_b,
>>>>> column_c)
>>>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>>>> partitions) >  ( column_c  partitions) .
>>>>>
>>>>> Try this.
>>>>>
>>>>> Regards,
>>>>> Shyam
>>>>>
>>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <da...@gmail.com> wrote:
>>>>>
>>>>>> I am trying to write data in dataset to hdfs via df.write.partitionBy
>>>>>> (column_a, column_b, column_c).parquet(output_path)
>>>>>> However, it costs several minutes to write only hundreds of MB data
>>>>>> to hdfs.
>>>>>> From this article
>>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>>>>> adding repartition method before write should work. But if there is
>>>>>> data skew, some tasks may cost much longer time than average, which still
>>>>>> cost much time.
>>>>>> How to solve this problem? Thanks in advance !
>>>>>>
>>>>>>
>>>>>> Regard,
>>>>>> Junfeng Chen
>>>>>>
>>>>>

Re: spark df.write.partitionBy run very slow

Posted by JF Chen <da...@gmail.com>.
I check my partitionBy method again, it's partitionBy(appname, year, month,
day, hour), and the number of partitions of appname is much more than
partition of year, month, day, and hour. My spark streaming app runs every
5 minutes, so year, month, day, and hour should be same in most of time.
So will the number of appname pattition affect the writing efficiency?

Regard,
Junfeng Chen


On Thu, Mar 7, 2019 at 4:21 PM JF Chen <da...@gmail.com> wrote:

> Yes, I agree.
>
> From the spark UI I can ensure data is not skewed. There is only about
> 100MB for each task, where most of tasks takes several seconds to write the
> data to hdfs, and some tasks takes minutes of time.
>
> Regard,
> Junfeng Chen
>
>
> On Wed, Mar 6, 2019 at 2:39 PM Shyam P <sh...@gmail.com> wrote:
>
>> Hi JF,
>> Yes first we should know actual number of partitions dataframe has and
>> its counts of records. Accordingly we should try to have data evenly in all
>> partitions.
>> It always better to have Num of paritions = N * Num of executors.
>>
>>
>>   "But the sequence of columns in  partitionBy  decides the
>> directory  hierarchy structure. I hope the sequence of columns not change"
>> , this is correct.
>> Hence sometimes we should go with bigger number first then lesser ....
>> try this ..i.e. more parent directories and less child directories. Tweet
>> around it and try.
>>
>> "some tasks in write hdfs stage cost much more time than others" may be
>> data is skewed, need to  distrube them evenly for all partitions.
>>
>> ~Shyam
>>
>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen <da...@gmail.com> wrote:
>>
>>> Hi Shyam
>>> Thanks for your reply.
>>> You mean after knowing the partition number of column_a, column_b,
>>> column_c, the sequence of column in partitionBy should be same to the order
>>> of partitions number of column a, b and c?
>>> But the sequence of columns in  partitionBy  decides the
>>> directory  hierarchy structure. I hope the sequence of columns not change.
>>>
>>> And I found one more strange things, some tasks in write hdfs stage cost
>>> much more time than others, where the amount of writing data is similar.
>>> How to solve it?
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>>
>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <sh...@gmail.com> wrote:
>>>
>>>> Hi JF ,
>>>>  Try to execute it before df.write....
>>>>
>>>> //count by partition_id
>>>>         import org.apache.spark.sql.functions.spark_partition_id
>>>>         df.groupBy(spark_partition_id).count.show()
>>>>
>>>> You will come to know how data has been partitioned inside df.
>>>>
>>>> Small trick we can apply here while partitionBy(column_a, column_b,
>>>> column_c)
>>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>>> partitions) >  ( column_c  partitions) .
>>>>
>>>> Try this.
>>>>
>>>> Regards,
>>>> Shyam
>>>>
>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <da...@gmail.com> wrote:
>>>>
>>>>> I am trying to write data in dataset to hdfs via df.write.partitionBy(column_a,
>>>>> column_b, column_c).parquet(output_path)
>>>>> However, it costs several minutes to write only hundreds of MB data to
>>>>> hdfs.
>>>>> From this article
>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>>>> adding repartition method before write should work. But if there is
>>>>> data skew, some tasks may cost much longer time than average, which still
>>>>> cost much time.
>>>>> How to solve this problem? Thanks in advance !
>>>>>
>>>>>
>>>>> Regard,
>>>>> Junfeng Chen
>>>>>
>>>>

Re: spark df.write.partitionBy run very slow

Posted by JF Chen <da...@gmail.com>.
Yes, I agree.

From the spark UI I can ensure data is not skewed. There is only about
100MB for each task, where most of tasks takes several seconds to write the
data to hdfs, and some tasks takes minutes of time.

Regard,
Junfeng Chen


On Wed, Mar 6, 2019 at 2:39 PM Shyam P <sh...@gmail.com> wrote:

> Hi JF,
> Yes first we should know actual number of partitions dataframe has and its
> counts of records. Accordingly we should try to have data evenly in all
> partitions.
> It always better to have Num of paritions = N * Num of executors.
>
>
>   "But the sequence of columns in  partitionBy  decides the
> directory  hierarchy structure. I hope the sequence of columns not change"
> , this is correct.
> Hence sometimes we should go with bigger number first then lesser .... try
> this ..i.e. more parent directories and less child directories. Tweet
> around it and try.
>
> "some tasks in write hdfs stage cost much more time than others" may be
> data is skewed, need to  distrube them evenly for all partitions.
>
> ~Shyam
>
> On Wed, Mar 6, 2019 at 8:33 AM JF Chen <da...@gmail.com> wrote:
>
>> Hi Shyam
>> Thanks for your reply.
>> You mean after knowing the partition number of column_a, column_b,
>> column_c, the sequence of column in partitionBy should be same to the order
>> of partitions number of column a, b and c?
>> But the sequence of columns in  partitionBy  decides the
>> directory  hierarchy structure. I hope the sequence of columns not change.
>>
>> And I found one more strange things, some tasks in write hdfs stage cost
>> much more time than others, where the amount of writing data is similar.
>> How to solve it?
>>
>> Regard,
>> Junfeng Chen
>>
>>
>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <sh...@gmail.com> wrote:
>>
>>> Hi JF ,
>>>  Try to execute it before df.write....
>>>
>>> //count by partition_id
>>>         import org.apache.spark.sql.functions.spark_partition_id
>>>         df.groupBy(spark_partition_id).count.show()
>>>
>>> You will come to know how data has been partitioned inside df.
>>>
>>> Small trick we can apply here while partitionBy(column_a, column_b,
>>> column_c)
>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>> partitions) >  ( column_c  partitions) .
>>>
>>> Try this.
>>>
>>> Regards,
>>> Shyam
>>>
>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <da...@gmail.com> wrote:
>>>
>>>> I am trying to write data in dataset to hdfs via df.write.partitionBy(column_a,
>>>> column_b, column_c).parquet(output_path)
>>>> However, it costs several minutes to write only hundreds of MB data to
>>>> hdfs.
>>>> From this article
>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>>> adding repartition method before write should work. But if there is
>>>> data skew, some tasks may cost much longer time than average, which still
>>>> cost much time.
>>>> How to solve this problem? Thanks in advance !
>>>>
>>>>
>>>> Regard,
>>>> Junfeng Chen
>>>>
>>>

Re: spark df.write.partitionBy run very slow

Posted by Shyam P <sh...@gmail.com>.
Hi JF,
Yes first we should know actual number of partitions dataframe has and its
counts of records. Accordingly we should try to have data evenly in all
partitions.
It always better to have Num of paritions = N * Num of executors.


  "But the sequence of columns in  partitionBy  decides the
directory  hierarchy structure. I hope the sequence of columns not change"
, this is correct.
Hence sometimes we should go with bigger number first then lesser .... try
this ..i.e. more parent directories and less child directories. Tweet
around it and try.

"some tasks in write hdfs stage cost much more time than others" may be
data is skewed, need to  distrube them evenly for all partitions.

~Shyam

On Wed, Mar 6, 2019 at 8:33 AM JF Chen <da...@gmail.com> wrote:

> Hi Shyam
> Thanks for your reply.
> You mean after knowing the partition number of column_a, column_b,
> column_c, the sequence of column in partitionBy should be same to the order
> of partitions number of column a, b and c?
> But the sequence of columns in  partitionBy  decides the
> directory  hierarchy structure. I hope the sequence of columns not change.
>
> And I found one more strange things, some tasks in write hdfs stage cost
> much more time than others, where the amount of writing data is similar.
> How to solve it?
>
> Regard,
> Junfeng Chen
>
>
> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <sh...@gmail.com> wrote:
>
>> Hi JF ,
>>  Try to execute it before df.write....
>>
>> //count by partition_id
>>         import org.apache.spark.sql.functions.spark_partition_id
>>         df.groupBy(spark_partition_id).count.show()
>>
>> You will come to know how data has been partitioned inside df.
>>
>> Small trick we can apply here while partitionBy(column_a, column_b,
>> column_c)
>> Makes sure  you should have ( column_a  partitions) > ( column_b
>> partitions) >  ( column_c  partitions) .
>>
>> Try this.
>>
>> Regards,
>> Shyam
>>
>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <da...@gmail.com> wrote:
>>
>>> I am trying to write data in dataset to hdfs via df.write.partitionBy(column_a,
>>> column_b, column_c).parquet(output_path)
>>> However, it costs several minutes to write only hundreds of MB data to
>>> hdfs.
>>> From this article
>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>> adding repartition method before write should work. But if there is
>>> data skew, some tasks may cost much longer time than average, which still
>>> cost much time.
>>> How to solve this problem? Thanks in advance !
>>>
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>

Re: spark df.write.partitionBy run very slow

Posted by JF Chen <da...@gmail.com>.
Hi Shyam
Thanks for your reply.
You mean after knowing the partition number of column_a, column_b,
column_c, the sequence of column in partitionBy should be same to the order
of partitions number of column a, b and c?
But the sequence of columns in  partitionBy  decides the
directory  hierarchy structure. I hope the sequence of columns not change.

And I found one more strange things, some tasks in write hdfs stage cost
much more time than others, where the amount of writing data is similar.
How to solve it?

Regard,
Junfeng Chen


On Tue, Mar 5, 2019 at 3:05 PM Shyam P <sh...@gmail.com> wrote:

> Hi JF ,
>  Try to execute it before df.write....
>
> //count by partition_id
>         import org.apache.spark.sql.functions.spark_partition_id
>         df.groupBy(spark_partition_id).count.show()
>
> You will come to know how data has been partitioned inside df.
>
> Small trick we can apply here while partitionBy(column_a, column_b,
> column_c)
> Makes sure  you should have ( column_a  partitions) > ( column_b
> partitions) >  ( column_c  partitions) .
>
> Try this.
>
> Regards,
> Shyam
>
> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <da...@gmail.com> wrote:
>
>> I am trying to write data in dataset to hdfs via df.write.partitionBy(column_a,
>> column_b, column_c).parquet(output_path)
>> However, it costs several minutes to write only hundreds of MB data to
>> hdfs.
>> From this article
>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>> adding repartition method before write should work. But if there is data
>> skew, some tasks may cost much longer time than average, which still cost
>> much time.
>> How to solve this problem? Thanks in advance !
>>
>>
>> Regard,
>> Junfeng Chen
>>
>

Re: spark df.write.partitionBy run very slow

Posted by Shyam P <sh...@gmail.com>.
Hi JF ,
 Try to execute it before df.write....

//count by partition_id
        import org.apache.spark.sql.functions.spark_partition_id
        df.groupBy(spark_partition_id).count.show()

You will come to know how data has been partitioned inside df.

Small trick we can apply here while partitionBy(column_a, column_b,
column_c)
Makes sure  you should have ( column_a  partitions) > ( column_b
partitions) >  ( column_c  partitions) .

Try this.

Regards,
Shyam

On Mon, Mar 4, 2019 at 4:09 PM JF Chen <da...@gmail.com> wrote:

> I am trying to write data in dataset to hdfs via df.write.partitionBy(column_a,
> column_b, column_c).parquet(output_path)
> However, it costs several minutes to write only hundreds of MB data to
> hdfs.
> From this article
> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
> adding repartition method before write should work. But if there is data
> skew, some tasks may cost much longer time than average, which still cost
> much time.
> How to solve this problem? Thanks in advance !
>
>
> Regard,
> Junfeng Chen
>