You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2023/03/28 12:42:00 UTC

[jira] [Assigned] (SPARK-42937) Join with subquery in condition can fail with wholestage codegen and adaptive execution disabled

     [ https://issues.apache.org/jira/browse/SPARK-42937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun reassigned SPARK-42937:
-------------------------------------

    Assignee: Bruce Robbins

> Join with subquery in condition can fail with wholestage codegen and adaptive execution disabled
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-42937
>                 URL: https://issues.apache.org/jira/browse/SPARK-42937
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.3.2, 3.4.0, 3.5.0
>            Reporter: Bruce Robbins
>            Assignee: Bruce Robbins
>            Priority: Major
>
> The below left outer join gets an error:
> {noformat}
> create or replace temp view v1 as
> select * from values
> (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1),
> (2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2),
> (3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)
> as v1(key, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10);
> create or replace temp view v2 as
> select * from values
> (1, 2),
> (3, 8),
> (7, 9)
> as v2(a, b);
> create or replace temp view v3 as
> select * from values
> (3),
> (8)
> as v3(col1);
> set spark.sql.codegen.maxFields=10; -- let's make maxFields 10 instead of 100
> set spark.sql.adaptive.enabled=false;
> select *
> from v1
> left outer join v2
> on key = a
> and key in (select col1 from v3);
> {noformat}
> The join fails during predicate codegen:
> {noformat}
> 23/03/27 12:24:12 WARN Predicate: Expr codegen error and falling back to interpreter mode
> java.lang.IllegalArgumentException: requirement failed: input[0, int, false] IN subquery#34 has not finished
> 	at scala.Predef$.require(Predef.scala:281)
> 	at org.apache.spark.sql.execution.InSubqueryExec.prepareResult(subquery.scala:144)
> 	at org.apache.spark.sql.execution.InSubqueryExec.doGenCode(subquery.scala:156)
> 	at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:201)
> 	at scala.Option.getOrElse(Option.scala:189)
> 	at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:196)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$2(CodeGenerator.scala:1278)
> 	at scala.collection.immutable.List.map(List.scala:293)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1278)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:41)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.generate(GeneratePredicate.scala:33)
> 	at org.apache.spark.sql.catalyst.expressions.Predicate$.createCodeGeneratedObject(predicates.scala:73)
> 	at org.apache.spark.sql.catalyst.expressions.Predicate$.createCodeGeneratedObject(predicates.scala:70)
> 	at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:51)
> 	at org.apache.spark.sql.catalyst.expressions.Predicate$.create(predicates.scala:86)
> 	at org.apache.spark.sql.execution.joins.HashJoin.boundCondition(HashJoin.scala:146)
> 	at org.apache.spark.sql.execution.joins.HashJoin.boundCondition$(HashJoin.scala:140)
> 	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.boundCondition$lzycompute(BroadcastHashJoinExec.scala:40)
> 	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.boundCondition(BroadcastHashJoinExec.scala:40)
> {noformat}
> It fails again after fallback to interpreter mode:
> {noformat}
> 23/03/27 12:24:12 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7)
> java.lang.IllegalArgumentException: requirement failed: input[0, int, false] IN subquery#34 has not finished
> 	at scala.Predef$.require(Predef.scala:281)
> 	at org.apache.spark.sql.execution.InSubqueryExec.prepareResult(subquery.scala:144)
> 	at org.apache.spark.sql.execution.InSubqueryExec.eval(subquery.scala:151)
> 	at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:52)
> 	at org.apache.spark.sql.execution.joins.HashJoin.$anonfun$boundCondition$2(HashJoin.scala:146)
> 	at org.apache.spark.sql.execution.joins.HashJoin.$anonfun$boundCondition$2$adapted(HashJoin.scala:146)
> 	at org.apache.spark.sql.execution.joins.HashJoin.$anonfun$outerJoin$1(HashJoin.scala:205)
> {noformat}
> Both the predicate codegen and the evaluation fail for the same reason: {{PlanSubqueries}} creates {{InSubqueryExec}} with {{shouldBroadcast=false}}. The driver waits for the subquery to finish, but it's the executor that uses the results of the subquery (for predicate codegen or evaluation). Because {{shouldBroadcast}} is set to false, the result is stored in a transient field ({{InSubqueryExec#result}}), so the result of the subquery is not serialized when the {{InSubqueryExec}} instance is sent to the executor.
> When wholestage codegen is enabled, the predicate codegen happens on the driver, so the subquery's result is available. When adaptive execution is enabled, {{PlanAdaptiveSubqueries}} always sets {{shouldBroadcast=true}}, so the subquery's result is available on the executor, if needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org