You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wang Shuo (Jira)" <ji...@apache.org> on 2019/09/23 09:55:00 UTC
[jira] [Updated] (SPARK-29213) Make it consistent when get notnull
output and generate null checks in FilterExec
[ https://issues.apache.org/jira/browse/SPARK-29213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wang Shuo updated SPARK-29213:
------------------------------
Summary: Make it consistent when get notnull output and generate null checks in FilterExec (was: Make it consistent when get notnull output and generate null checks in FilteExec)
> Make it consistent when get notnull output and generate null checks in FilterExec
> ---------------------------------------------------------------------------------
>
> Key: SPARK-29213
> URL: https://issues.apache.org/jira/browse/SPARK-29213
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.4.4
> Reporter: Wang Shuo
> Priority: Major
>
> Currently the behavior of getting output and generating null checks in FilterExec is different. Thus some nullable attribute could be treated as not nullable by mistake.
> In FilterExec.ouput, an attribute is marked as nullable or not by finding its `exprId` in notNullAttributes:
> {code:java}
> a.nullable && notNullAttributes.contains(a.exprId)
> {code}
> But in FilterExec.doConsume, a `nullCheck` is generated or not for an attribute is decided by whether there is semantic equal not null predicate:
> {code:java}
> val nullChecks = c.references.map { r => val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)} if (idx != -1 && !generatedIsNotNullChecks(idx)) { generatedIsNotNullChecks(idx) = true // Use the child's output. The nullability is what the child produced. genPredicate(notNullPreds(idx), input, child.output) } else { "" } }.mkString("\n").trim
> {code}
>
> NPE will happen when run the SQL below:
> {code:java}
> sql("create table table1(x string)")
> sql("create table table2(x bigint)")
> sql("create table table3(x string)")
> sql("insert into table2 select null as x")
> sql(
> """
> |select t1.x
> |from (
> | select x from table1) t1
> |left join (
> | select x from (
> | select x from table2
> | union all
> | select substr(x,5) x from table3
> | ) a
> | where length(x)>0
> |) t3
> |on t1.x=t3.x
> """.stripMargin).collect()
> {code}
>
> NPE Exception:
> {code:java}
> java.lang.NullPointerException
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:40)
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:135)
> at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:94)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> at org.apache.spark.scheduler.Task.run(Task.scala:127)
> at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:449)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:452)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>
> the generated code:
> {code:java}
> == Subtree 4 / 5 ==
> *(2) Project [cast(x#7L as string) AS x#9]
> +- *(2) Filter ((length(cast(x#7L as string)) > 0) AND isnotnull(cast(x#7L as string)))
> +- Scan hive default.table2 [x#7L], HiveTableRelation `default`.`table2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [x#7L]
> Generated code:
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */ return new GeneratedIteratorForCodegenStage2(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ // codegenStageId=2
> /* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
> /* 007 */ private Object[] references;
> /* 008 */ private scala.collection.Iterator[] inputs;
> /* 009 */ private scala.collection.Iterator inputadapter_input_0;
> /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] filter_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
> /* 011 */
> /* 012 */ public GeneratedIteratorForCodegenStage2(Object[] references) {
> /* 013 */ this.references = references;
> /* 014 */ }
> /* 015 */
> /* 016 */ public void init(int index, scala.collection.Iterator[] inputs) {
> /* 017 */ partitionIndex = index;
> /* 018 */ this.inputs = inputs;
> /* 019 */ inputadapter_input_0 = inputs[0];
> /* 020 */ filter_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
> /* 021 */ filter_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
> /* 022 */
> /* 023 */ }
> /* 024 */
> /* 025 */ protected void processNext() throws java.io.IOException {
> /* 026 */ while ( inputadapter_input_0.hasNext()) {
> /* 027 */ InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
> /* 028 */
> /* 029 */ do {
> /* 030 */ boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0);
> /* 031 */ long inputadapter_value_0 = inputadapter_isNull_0 ?
> /* 032 */ -1L : (inputadapter_row_0.getLong(0));
> /* 033 */
> /* 034 */ boolean filter_isNull_2 = inputadapter_isNull_0;
> /* 035 */ UTF8String filter_value_2 = null;
> /* 036 */ if (!inputadapter_isNull_0) {
> /* 037 */ filter_value_2 = UTF8String.fromString(String.valueOf(inputadapter_value_0));
> /* 038 */ }
> /* 039 */ int filter_value_1 = -1;
> /* 040 */ filter_value_1 = (filter_value_2).numChars();
> /* 041 */
> /* 042 */ boolean filter_value_0 = false;
> /* 043 */ filter_value_0 = filter_value_1 > 0;
> /* 044 */ if (!filter_value_0) continue;
> /* 045 */
> /* 046 */ boolean filter_isNull_6 = inputadapter_isNull_0;
> /* 047 */ UTF8String filter_value_6 = null;
> /* 048 */ if (!inputadapter_isNull_0) {
> /* 049 */ filter_value_6 = UTF8String.fromString(String.valueOf(inputadapter_value_0));
> /* 050 */ }
> /* 051 */ if (!(!filter_isNull_6)) continue;
> /* 052 */
> /* 053 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
> /* 054 */
> /* 055 */ boolean project_isNull_0 = false;
> /* 056 */ UTF8String project_value_0 = null;
> /* 057 */ if (!false) {
> /* 058 */ project_value_0 = UTF8String.fromString(String.valueOf(inputadapter_value_0));
> /* 059 */ }
> /* 060 */ filter_mutableStateArray_0[1].reset();
> /* 061 */
> /* 062 */ filter_mutableStateArray_0[1].zeroOutNullBytes();
> /* 063 */
> /* 064 */ if (project_isNull_0) {
> /* 065 */ filter_mutableStateArray_0[1].setNullAt(0);
> /* 066 */ } else {
> /* 067 */ filter_mutableStateArray_0[1].write(0, project_value_0);
> /* 068 */ }
> /* 069 */ append((filter_mutableStateArray_0[1].getRow()));
> /* 070 */
> /* 071 */ } while(false);
> /* 072 */ if (shouldStop()) return;
> /* 073 */ }
> /* 074 */ }
> /* 075 */
> /* 076 */ }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org