You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jianneng Li <ji...@workday.com> on 2020/02/25 02:15:36 UTC

[Spark SQL] Memory problems with packing too many joins into the same WholeStageCodegen

Hello everyone,

WholeStageCodegen generates code that appends results<https://github.com/apache/spark/blob/v3.0.0-preview2/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L771> into a BufferedRowIterator, which keeps the results in an in-memory linked list<https://github.com/apache/spark/blob/v3.0.0-preview2/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L34>. Long story short, this is a problem when multiple joins (i.e. BroadcastHashJoin) that can blow up get planned into the same WholeStageCodegen - results keep on accumulating in the linked list, and do not get consumed fast enough, eventually causing the JVM to run out of memory.

Does anyone else have experience with this problem? Some obvious solutions include making BufferedRowIterator spill the linked list, or make it bounded, but I'd imagine that this would have been done a long time ago if it were necessary.

Thanks,

Jianneng

Re: [Spark SQL] Memory problems with packing too many joins into the same WholeStageCodegen

Posted by Liu Genie <ge...@outlook.com>.
Exactly. My problem is a big dataframe joins a lot of small dataframes which I convert to Maps and then use udf apply on the big dataframe. (broadcast didn’t work in too many joins)

2020年2月26日 09:32,Jianneng Li <ji...@workday.com>> 写道:

I could be wrong, but I'm guessing that it uses UDF as the build side<https://en.wikipedia.org/wiki/Hash_join#Classic_hash_join> of a hash join. So the hash table is inside the UDF, and the UDF is called to perform the join. There are limitations to this approach of course, you can't do all joins this way.

Best,

Jianneng
________________________________
From: yeikel valdes <em...@yeikel.com>>
Sent: Tuesday, February 25, 2020 5:48 AM
To: Jianneng Li <ji...@workday.com>>
Cc: user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>; genie_liu@outlook.com<ma...@outlook.com> <ge...@outlook.com>>
Subject: Re: [Spark SQL] Memory problems with packing too many joins into the same WholeStageCodegen

Can you please explain what you mean with that? How do you use a udf to replace a join? Thanks



---- On Mon, 24 Feb 2020 22:06:40 -0500 jianneng.li@workday.com<ma...@workday.com> wrote ----

Thanks Genie. Unfortunately, the joins I'm doing in this case are large, so UDF likely won't work.

Jianneng
________________________________
From: Liu Genie <ge...@outlook.com>>
Sent: Monday, February 24, 2020 6:39 PM
To: user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>
Subject: Re: [Spark SQL] Memory problems with packing too many joins into the same WholeStageCodegen

I have encountered too many joins problem before. Since the joined dataframe is small enough, I convert join to udf operation, which is much faster and didn’t generate out of memory problem.

2020年2月25日 10:15,Jianneng Li <ji...@workday.com>> 写道:

Hello everyone,

WholeStageCodegen generates code that appends results<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_v3.0.0-2Dpreview2_sql_core_src_main_scala_org_apache_spark_sql_execution_WholeStageCodegenExec.scala-23L771&d=DwMGaQ&c=DS6PUFBBr_KiLo7Sjt3ljp5jaW5k2i9ijVXllEdOozc&r=VEtAA5SS60IF_f_H4BzelvlCoMSY5ifjy9fFlCw_oas&m=3v7ZvuXA3v-_ZDL_l5qNLI9kqbuxdt9iAHBp5-1QE74&s=Mwyiq0QEcEm14_jxWDDammyNHLcF9SPuxY8yD-urWEE&e=> into a BufferedRowIterator, which keeps the results in an in-memory linked list<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_v3.0.0-2Dpreview2_sql_core_src_main_java_org_apache_spark_sql_execution_BufferedRowIterator.java-23L34&d=DwMGaQ&c=DS6PUFBBr_KiLo7Sjt3ljp5jaW5k2i9ijVXllEdOozc&r=VEtAA5SS60IF_f_H4BzelvlCoMSY5ifjy9fFlCw_oas&m=3v7ZvuXA3v-_ZDL_l5qNLI9kqbuxdt9iAHBp5-1QE74&s=MqqKu5Lncn4eWxPccnxtzNe61wTzCrrYvW-Zgh7mMiM&e=>. Long story short, this is a problem when multiple joins (i.e. BroadcastHashJoin) that can blow up get planned into the same WholeStageCodegen - results keep on accumulating in the linked list, and do not get consumed fast enough, eventually causing the JVM to run out of memory.

Does anyone else have experience with this problem? Some obvious solutions include making BufferedRowIterator spill the linked list, or make it bounded, but I'd imagine that this would have been done a long time ago if it were necessary.

Thanks,

Jianneng


Re: [Spark SQL] Memory problems with packing too many joins into the same WholeStageCodegen

Posted by Jianneng Li <ji...@workday.com>.
I could be wrong, but I'm guessing that it uses UDF as the build side<https://en.wikipedia.org/wiki/Hash_join#Classic_hash_join> of a hash join. So the hash table is inside the UDF, and the UDF is called to perform the join. There are limitations to this approach of course, you can't do all joins this way.

Best,

Jianneng
________________________________
From: yeikel valdes <em...@yeikel.com>
Sent: Tuesday, February 25, 2020 5:48 AM
To: Jianneng Li <ji...@workday.com>
Cc: user@spark.apache.org <us...@spark.apache.org>; genie_liu@outlook.com <ge...@outlook.com>
Subject: Re: [Spark SQL] Memory problems with packing too many joins into the same WholeStageCodegen

Can you please explain what you mean with that? How do you use a udf to replace a join? Thanks



---- On Mon, 24 Feb 2020 22:06:40 -0500 jianneng.li@workday.com wrote ----

Thanks Genie. Unfortunately, the joins I'm doing in this case are large, so UDF likely won't work.

Jianneng
________________________________
From: Liu Genie <ge...@outlook.com>>
Sent: Monday, February 24, 2020 6:39 PM
To: user@spark.apache.org<ma...@spark.apache.org> <us...@spark.apache.org>>
Subject: Re: [Spark SQL] Memory problems with packing too many joins into the same WholeStageCodegen

I have encountered too many joins problem before. Since the joined dataframe is small enough, I convert join to udf operation, which is much faster and didn’t generate out of memory problem.

2020年2月25日 10:15,Jianneng Li <ji...@workday.com>> 写道:

Hello everyone,

WholeStageCodegen generates code that appends results<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_v3.0.0-2Dpreview2_sql_core_src_main_scala_org_apache_spark_sql_execution_WholeStageCodegenExec.scala-23L771&d=DwMGaQ&c=DS6PUFBBr_KiLo7Sjt3ljp5jaW5k2i9ijVXllEdOozc&r=VEtAA5SS60IF_f_H4BzelvlCoMSY5ifjy9fFlCw_oas&m=3v7ZvuXA3v-_ZDL_l5qNLI9kqbuxdt9iAHBp5-1QE74&s=Mwyiq0QEcEm14_jxWDDammyNHLcF9SPuxY8yD-urWEE&e=> into a BufferedRowIterator, which keeps the results in an in-memory linked list<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_v3.0.0-2Dpreview2_sql_core_src_main_java_org_apache_spark_sql_execution_BufferedRowIterator.java-23L34&d=DwMGaQ&c=DS6PUFBBr_KiLo7Sjt3ljp5jaW5k2i9ijVXllEdOozc&r=VEtAA5SS60IF_f_H4BzelvlCoMSY5ifjy9fFlCw_oas&m=3v7ZvuXA3v-_ZDL_l5qNLI9kqbuxdt9iAHBp5-1QE74&s=MqqKu5Lncn4eWxPccnxtzNe61wTzCrrYvW-Zgh7mMiM&e=>. Long story short, this is a problem when multiple joins (i.e. BroadcastHashJoin) that can blow up get planned into the same WholeStageCodegen - results keep on accumulating in the linked list, and do not get consumed fast enough, eventually causing the JVM to run out of memory.

Does anyone else have experience with this problem? Some obvious solutions include making BufferedRowIterator spill the linked list, or make it bounded, but I'd imagine that this would have been done a long time ago if it were necessary.

Thanks,

Jianneng



Re: [Spark SQL] Memory problems with packing too many joins into the same WholeStageCodegen

Posted by yeikel valdes <em...@yeikel.com>.
Can you please explain what you mean with that? How do you use a udf to replace a join? Thanks




---- On Mon, 24 Feb 2020 22:06:40 -0500 jianneng.li@workday.com wrote ----


Thanks Genie. Unfortunately, the joins I'm doing in this case are large, so UDF likely won't work.


Jianneng
From: Liu Genie <ge...@outlook.com>
Sent: Monday, February 24, 2020 6:39 PM
To: user@spark.apache.org <us...@spark.apache.org>
Subject: Re: [Spark SQL] Memory problems with packing too many joins into the same WholeStageCodegen
 
I have encountered too many joins problem before. Since the joined dataframe is small enough, I convert join to udf operation, which is much faster and didn’t generate out of memory problem.



2020年2月25日 10:15,Jianneng Li <ji...@workday.com> 写道:


Hello everyone,


WholeStageCodegen generates code that appends results into a BufferedRowIterator, which keeps the results in an in-memory linked list. Long story short, this is a problem when multiple joins (i.e. BroadcastHashJoin) that can blow up get planned into the same WholeStageCodegen - results keep on accumulating in the linked list, and do not get consumed fast enough, eventually causing the JVM to run out of memory.


Does anyone else have experience with this problem? Some obvious solutions include making BufferedRowIterator spill the linked list, or make it bounded, but I'd imagine that this would have been done a long time ago if it were necessary.


Thanks,


Jianneng


Re: [Spark SQL] Memory problems with packing too many joins into the same WholeStageCodegen

Posted by Jianneng Li <ji...@workday.com>.
Thanks Genie. Unfortunately, the joins I'm doing in this case are large, so UDF likely won't work.

Jianneng
________________________________
From: Liu Genie <ge...@outlook.com>
Sent: Monday, February 24, 2020 6:39 PM
To: user@spark.apache.org <us...@spark.apache.org>
Subject: Re: [Spark SQL] Memory problems with packing too many joins into the same WholeStageCodegen

I have encountered too many joins problem before. Since the joined dataframe is small enough, I convert join to udf operation, which is much faster and didn’t generate out of memory problem.

2020年2月25日 10:15,Jianneng Li <ji...@workday.com>> 写道:

Hello everyone,

WholeStageCodegen generates code that appends results<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_v3.0.0-2Dpreview2_sql_core_src_main_scala_org_apache_spark_sql_execution_WholeStageCodegenExec.scala-23L771&d=DwMGaQ&c=DS6PUFBBr_KiLo7Sjt3ljp5jaW5k2i9ijVXllEdOozc&r=VEtAA5SS60IF_f_H4BzelvlCoMSY5ifjy9fFlCw_oas&m=3v7ZvuXA3v-_ZDL_l5qNLI9kqbuxdt9iAHBp5-1QE74&s=Mwyiq0QEcEm14_jxWDDammyNHLcF9SPuxY8yD-urWEE&e=> into a BufferedRowIterator, which keeps the results in an in-memory linked list<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_blob_v3.0.0-2Dpreview2_sql_core_src_main_java_org_apache_spark_sql_execution_BufferedRowIterator.java-23L34&d=DwMGaQ&c=DS6PUFBBr_KiLo7Sjt3ljp5jaW5k2i9ijVXllEdOozc&r=VEtAA5SS60IF_f_H4BzelvlCoMSY5ifjy9fFlCw_oas&m=3v7ZvuXA3v-_ZDL_l5qNLI9kqbuxdt9iAHBp5-1QE74&s=MqqKu5Lncn4eWxPccnxtzNe61wTzCrrYvW-Zgh7mMiM&e=>. Long story short, this is a problem when multiple joins (i.e. BroadcastHashJoin) that can blow up get planned into the same WholeStageCodegen - results keep on accumulating in the linked list, and do not get consumed fast enough, eventually causing the JVM to run out of memory.

Does anyone else have experience with this problem? Some obvious solutions include making BufferedRowIterator spill the linked list, or make it bounded, but I'd imagine that this would have been done a long time ago if it were necessary.

Thanks,

Jianneng


Re: [Spark SQL] Memory problems with packing too many joins into the same WholeStageCodegen

Posted by Liu Genie <ge...@outlook.com>.
I have encountered too many joins problem before. Since the joined dataframe is small enough, I convert join to udf operation, which is much faster and didn’t generate out of memory problem.

2020年2月25日 10:15,Jianneng Li <ji...@workday.com>> 写道:

Hello everyone,

WholeStageCodegen generates code that appends results<https://github.com/apache/spark/blob/v3.0.0-preview2/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L771> into a BufferedRowIterator, which keeps the results in an in-memory linked list<https://github.com/apache/spark/blob/v3.0.0-preview2/sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java#L34>. Long story short, this is a problem when multiple joins (i.e. BroadcastHashJoin) that can blow up get planned into the same WholeStageCodegen - results keep on accumulating in the linked list, and do not get consumed fast enough, eventually causing the JVM to run out of memory.

Does anyone else have experience with this problem? Some obvious solutions include making BufferedRowIterator spill the linked list, or make it bounded, but I'd imagine that this would have been done a long time ago if it were necessary.

Thanks,

Jianneng