You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (Jira)" <ji...@apache.org> on 2019/09/27 07:41:00 UTC

[jira] [Assigned] (SPARK-29213) Make it consistent when get notnull output and generate null checks in FilterExec

     [ https://issues.apache.org/jira/browse/SPARK-29213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wenchen Fan reassigned SPARK-29213:
-----------------------------------

    Assignee: Wang Shuo

> Make it consistent when get notnull output and generate null checks in FilterExec
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-29213
>                 URL: https://issues.apache.org/jira/browse/SPARK-29213
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.4
>            Reporter: Wang Shuo
>            Assignee: Wang Shuo
>            Priority: Major
>             Fix For: 2.4.5, 3.0.0
>
>
> Currently the behavior of getting output and generating null checks in FilterExec is different. Thus some nullable attribute could be treated as not nullable by mistake.
> In FilterExec.ouput, an attribute is marked as nullable or not by finding its `exprId` in notNullAttributes:
> {code:java}
> a.nullable && notNullAttributes.contains(a.exprId)
> {code}
> But in FilterExec.doConsume,  a `nullCheck` is generated or not for an attribute is decided by whether there is semantic equal not null predicate:
> {code:java}
> val nullChecks = c.references.map { r => val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)} if (idx != -1 && !generatedIsNotNullChecks(idx)) { generatedIsNotNullChecks(idx) = true // Use the child's output. The nullability is what the child produced. genPredicate(notNullPreds(idx), input, child.output) } else { "" } }.mkString("\n").trim
> {code}
>  
> NPE will happen when run the SQL below:
> {code:java}
> sql("create table table1(x string)")
> sql("create table table2(x bigint)")
> sql("create table table3(x string)")
> sql("insert into table2 select null as x")
> sql(
>   """
>     |select t1.x
>     |from (
>     |    select x from table1) t1
>     |left join (
>     |    select x from (
>     |        select x from table2
>     |        union all
>     |        select substr(x,5) x from table3
>     |    ) a
>     |    where length(x)>0
>     |) t3
>     |on t1.x=t3.x
>   """.stripMargin).collect()
> {code}
>  
> NPE Exception:
> {code:java}
> java.lang.NullPointerException
>     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:40)
>     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>     at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:135)
>     at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:94)
>     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>     at org.apache.spark.scheduler.Task.run(Task.scala:127)
>     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:449)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:452)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> the generated code:
> {code:java}
> == Subtree 4 / 5 ==
> *(2) Project [cast(x#7L as string) AS x#9]
> +- *(2) Filter ((length(cast(x#7L as string)) > 0) AND isnotnull(cast(x#7L as string)))
>    +- Scan hive default.table2 [x#7L], HiveTableRelation `default`.`table2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [x#7L]
> Generated code:
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */   return new GeneratedIteratorForCodegenStage2(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ // codegenStageId=2
> /* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
> /* 007 */   private Object[] references;
> /* 008 */   private scala.collection.Iterator[] inputs;
> /* 009 */   private scala.collection.Iterator inputadapter_input_0;
> /* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] filter_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
> /* 011 */
> /* 012 */   public GeneratedIteratorForCodegenStage2(Object[] references) {
> /* 013 */     this.references = references;
> /* 014 */   }
> /* 015 */
> /* 016 */   public void init(int index, scala.collection.Iterator[] inputs) {
> /* 017 */     partitionIndex = index;
> /* 018 */     this.inputs = inputs;
> /* 019 */     inputadapter_input_0 = inputs[0];
> /* 020 */     filter_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
> /* 021 */     filter_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
> /* 022 */
> /* 023 */   }
> /* 024 */
> /* 025 */   protected void processNext() throws java.io.IOException {
> /* 026 */     while ( inputadapter_input_0.hasNext()) {
> /* 027 */       InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
> /* 028 */
> /* 029 */       do {
> /* 030 */         boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0);
> /* 031 */         long inputadapter_value_0 = inputadapter_isNull_0 ?
> /* 032 */         -1L : (inputadapter_row_0.getLong(0));
> /* 033 */
> /* 034 */         boolean filter_isNull_2 = inputadapter_isNull_0;
> /* 035 */         UTF8String filter_value_2 = null;
> /* 036 */         if (!inputadapter_isNull_0) {
> /* 037 */           filter_value_2 = UTF8String.fromString(String.valueOf(inputadapter_value_0));
> /* 038 */         }
> /* 039 */         int filter_value_1 = -1;
> /* 040 */         filter_value_1 = (filter_value_2).numChars();
> /* 041 */
> /* 042 */         boolean filter_value_0 = false;
> /* 043 */         filter_value_0 = filter_value_1 > 0;
> /* 044 */         if (!filter_value_0) continue;
> /* 045 */
> /* 046 */         boolean filter_isNull_6 = inputadapter_isNull_0;
> /* 047 */         UTF8String filter_value_6 = null;
> /* 048 */         if (!inputadapter_isNull_0) {
> /* 049 */           filter_value_6 = UTF8String.fromString(String.valueOf(inputadapter_value_0));
> /* 050 */         }
> /* 051 */         if (!(!filter_isNull_6)) continue;
> /* 052 */
> /* 053 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
> /* 054 */
> /* 055 */         boolean project_isNull_0 = false;
> /* 056 */         UTF8String project_value_0 = null;
> /* 057 */         if (!false) {
> /* 058 */           project_value_0 = UTF8String.fromString(String.valueOf(inputadapter_value_0));
> /* 059 */         }
> /* 060 */         filter_mutableStateArray_0[1].reset();
> /* 061 */
> /* 062 */         filter_mutableStateArray_0[1].zeroOutNullBytes();
> /* 063 */
> /* 064 */         if (project_isNull_0) {
> /* 065 */           filter_mutableStateArray_0[1].setNullAt(0);
> /* 066 */         } else {
> /* 067 */           filter_mutableStateArray_0[1].write(0, project_value_0);
> /* 068 */         }
> /* 069 */         append((filter_mutableStateArray_0[1].getRow()));
> /* 070 */
> /* 071 */       } while(false);
> /* 072 */       if (shouldStop()) return;
> /* 073 */     }
> /* 074 */   }
> /* 075 */
> /* 076 */ }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org