You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (Jira)" <ji...@apache.org> on 2022/01/04 19:04:00 UTC

[jira] [Resolved] (SPARK-37784) CodeGenerator.addBufferedState() does not properly handle UDTs

     [ https://issues.apache.org/jira/browse/SPARK-37784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Josh Rosen resolved SPARK-37784.
--------------------------------
    Fix Version/s: 3.3.0
                   3.0.4
                   3.2.1
                   3.1.3
       Resolution: Fixed

Issue resolved by pull request 35066
[https://github.com/apache/spark/pull/35066]

> CodeGenerator.addBufferedState() does not properly handle UDTs
> --------------------------------------------------------------
>
>                 Key: SPARK-37784
>                 URL: https://issues.apache.org/jira/browse/SPARK-37784
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Josh Rosen
>            Assignee: Josh Rosen
>            Priority: Major
>              Labels: correctness
>             Fix For: 3.3.0, 3.0.4, 3.2.1, 3.1.3
>
>
> The {{CodeGenerator.addBufferedState()}} method does not properly handle UDTs: it pattern-matches on a data type to determine whether {{copy()}} or {{clone()}} operations need to be performed but the current pattern match does not handle UDTs and instead falls through to the default case which causes values to be stored without copying. This is problematic if the UDT's underlying data type requires copying (i.e. the UDT is internally represented using an array, struct, map, or sting type).
> This issue impacts queries which use sort-merge join where UDTs appear as part of join keys.
> I discovered this while investigating a query which failed with segfaults. I managed to shrink my original query down to the following reproduction (which uses Spark's built-in Vector UDT):
> {code:java}
> import org.apache.spark.ml.linalg.Vectors
> val df = spark.createDataFrame(
>   Seq(
>     (Vectors.dense(1.0), Vectors.dense(1.0)),
>     (Vectors.dense(1.0), Vectors.dense(2.0))
>   )).toDF("key", "value")
> sql("set spark.sql.adaptive.enabled = false")
> sql("set spark.sql.autoBroadcastJoinThreshold = -1")
> sql("set spark.sql.shuffle.partitions = 1")df.join(df, "key").show()
> df.join(df, "key").explain("codegen")
> df.join(df, "key").show() {code}
> When run with off-heap memory enabled, this failed with a segfault at the stack
> {code:java}
> Stack: [0x00007f518a7b5000,0x00007f518abb6000],  sp=0x00007f518abb32e0,  free space=4088k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
> J 12956 C1 org.apache.spark.unsafe.Platform.getLong(Ljava/lang/Object;J)J (9 bytes) @ 0x00007f5e5ec2809e [0x00007f5e5ec28060+0x3e]
> j  org.apache.spark.unsafe.array.ByteArrayMethods.arrayEquals(Ljava/lang/Object;JLjava/lang/Object;JJ)Z+135
> j  org.apache.spark.sql.catalyst.expressions.UnsafeRow.equals(Ljava/lang/Object;)Z+44
> j  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_compareStruct_0(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I+16
> j  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.smj_findNextInnerJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage1;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z+107
> j  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext()V+393
> j  org.apache.spark.sql.execution.BufferedRowIterator.hasNext()Z+11
> j  org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext()Z+4{code}
> Please note that this particular reproduction does not fail in all environments (I can't reproduce it on my laptop, for example, or on certain EC2 instance types).
> Here is an annotated excerpt from the generated code which shows the source of the problem:
> {code:java}
> /* 223 */   private boolean smj_findNextJoinRows_0(
> /* 224 */     scala.collection.Iterator streamedIter,
> /* 225 */     scala.collection.Iterator bufferedIter) {
> /* 226 */     smj_streamedRow_0 = null;
> /* 227 */     int comp = 0;
> /* 228 */     while (smj_streamedRow_0 == null) {
> /* 229 */       if (!streamedIter.hasNext()) return false;
> /* 230 */       smj_streamedRow_0 = (InternalRow) streamedIter.next();
> /* 231 */       boolean smj_isNull_0 = smj_streamedRow_0.isNullAt(0);
> // smj_value_0 is a value retrieved from a streamed row:
> //                              |
> //                              V
> /* 232 */       InternalRow smj_value_0 = smj_isNull_0 ?
> /* 233 */       null : (smj_streamedRow_0.getStruct(0, 4));
> [...]
> // This value is stored in smj_mutableStateArray_0[0] without
> // copying (even though it's a struct, not an atomic type):
> /* 265 */           smj_mutableStateArray_0[0] = smj_value_1;
> /* 266 */         }{code}
> I believe the fix for this bug is fairly simple: we just need to modify {{CodeGenerator.addBufferedState()}} so that it uses UDTs' underlying sqlType when determining whether value copying is needed. 
> I've labeled this as a correctness issue because a "missing copying" bug can theoretically lead to wrong query results, not just crashes, although I haven't been able to contrive a test case demonstrating a wrong result due to this bug.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org