You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "wankunde (via GitHub)" <gi...@apache.org> on 2023/07/28 10:47:40 UTC

[GitHub] [spark] wankunde opened a new pull request, #42206: [SPARK-44582] Skip iterator on SMJ if it was cleaned up

wankunde opened a new pull request, #42206:
URL: https://github.com/apache/spark/pull/42206

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   
   Bugfix for SMJ which may cause JVM crash.
   
   **When will the JVM crash**
   
   Query pattern: 
   
   TableScan     TableScan
      |              |
    Exchange      Exchange
      |              |
     Sort 1         Sort 2
      |              |
    Window 1      Window 2
       \          /
         \      /
           SMJ
            |
            |
     WriteFileCommand
   
   1. WriteFileCommand call hasNext() to check if the input is empty.
   2. SMJ call findNextJoinRows() to find all matched rows.
   2.1 SMJ tries to get the first row in the left child.
   2.1.1 Sort 1 will sort all the input rows in the Offheap memory.
   2.1.2 Window 1 will read one group data and the first row in next group (named X), return the first row in the first group.
   2.2 SMJ tries to get the first row in the right child.
   2.2.1 Sort 2 and Window 2 are empty, do nothing.
   2.3  Inner SMJ will finish, since there will definitely be no join rows, call earlyCleanupResources() to free offHeap memory.
   3. WriteFileCommand call hasNext() again to write the input data to the files.
   4. SMJ call findNextJoinRows() to find all matched rows.
   4.1 SMJ tries to get the first row in the left child.
   4.2 Window 1 tries to add row X into the group buffer, which will accesse unallocated memory, the JVM may or may not crash.
   
   In this PR, if SMJ has already been cleaned up, skip iterator on it.
   
   ### Why are the changes needed?
   
   Bugfix for SMJ.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   
   ### How was this patch tested?
   
   Test in our production environment.
   For unsafe API, when read the unallocated memory, the program may get the old value, or get a unexpected value, or cause the JVM crash.
   I don't think the UIT will be stable.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #42206: [SPARK-44582][SQL] Skip iterator on SMJ if it was cleaned up

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #42206: [SPARK-44582][SQL] Skip iterator on SMJ if it was cleaned up
URL: https://github.com/apache/spark/pull/42206


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on pull request #42206: [SPARK-44582] Skip iterator on SMJ if it was cleaned up

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #42206:
URL: https://github.com/apache/spark/pull/42206#issuecomment-1655520091

   Add the inner SMJ gen-code for better understand.
   
   ```java
   /* 036 */   private boolean smj_findNextJoinRows_0(
   /* 037 */     scala.collection.Iterator streamedIter,
   /* 038 */     scala.collection.Iterator bufferedIter) {
   /* 039 */     smj_streamedRow_0 = null;
   /* 040 */     int comp = 0;
   /* 041 */     while (smj_streamedRow_0 == null) {
   /* 042 */       if (!streamedIter.hasNext()) return false;  // 2.1.2 Window 1 will read one group data and the first row in next group (named X), return the first row in the first group.
   /* 043 */       smj_streamedRow_0 = (InternalRow) streamedIter.next();
   /* 044 */       int smj_value_0 = smj_streamedRow_0.getInt(0);
   /* 045 */       if (false) {
   /* 046 */         smj_streamedRow_0 = null;
   /* 047 */         continue;
   /* 048 */
   /* 049 */       }
   /* 050 */       if (!smj_matches_0.isEmpty()) {
   /* 051 */         comp = 0;
   /* 052 */         if (comp == 0) {
   /* 053 */           comp = (smj_value_0 > smj_value_3 ? 1 : smj_value_0 < smj_value_3 ? -1 : 0);
   /* 054 */         }
   /* 055 */
   /* 056 */         if (comp == 0) {
   /* 057 */           return true;
   /* 058 */         }
   /* 059 */         smj_matches_0.clear();
   /* 060 */       }
   /* 061 */
   /* 062 */       do {
   /* 063 */         if (smj_bufferedRow_0 == null) {
   /* 064 */           if (!bufferedIter.hasNext()) {
   /* 065 */             smj_value_3 = smj_value_0;
   /* 066 */             return !smj_matches_0.isEmpty();  // 2.2.1 Sort 2 and Window 2 are empty, no matched rows, and SMJ will finish. 
   /* 067 */           }
   /* 068 */           smj_bufferedRow_0 = (InternalRow) bufferedIter.next();
   /* 069 */           int smj_value_1 = smj_bufferedRow_0.getInt(0);
   /* 070 */           if (false) {
   /* 071 */             smj_bufferedRow_0 = null;
   /* 072 */             continue;
   /* 073 */           }
   /* 074 */           smj_value_2 = smj_value_1;
   /* 075 */         }
   /* 076 */
   /* 077 */         comp = 0;
   /* 078 */         if (comp == 0) {
   /* 079 */           comp = (smj_value_0 > smj_value_2 ? 1 : smj_value_0 < smj_value_2 ? -1 : 0);
   /* 080 */         }
   /* 081 */
   /* 082 */         if (comp > 0) {
   /* 083 */           smj_bufferedRow_0 = null;
   /* 084 */         } else if (comp < 0) {
   /* 085 */           if (!smj_matches_0.isEmpty()) {
   /* 086 */             smj_value_3 = smj_value_0;
   /* 087 */             return true;
   /* 088 */           } else {
   /* 089 */             smj_streamedRow_0 = null;
   /* 090 */           }
   /* 091 */         } else {
   /* 092 */           smj_matches_0.add((UnsafeRow) smj_bufferedRow_0);
   /* 093 */           smj_bufferedRow_0 = null;
   /* 094 */         }
   /* 095 */       } while (smj_streamedRow_0 != null);
   /* 096 */     }
   /* 097 */     return false; // unreachable
   /* 098 */   }
   /* 099 */
   /* 100 */   protected void processNext() throws java.io.IOException {
   /* 101 */     if (!wholestagecodegen_initJoin_0) {
   /* 102 */       wholestagecodegen_initJoin_0 = true;
   /* 113 */
   /* 114 */     while (smj_findNextJoinRows_0(smj_streamedInput_0, smj_bufferedInput_0)) {
   /* 115 */       int smj_value_4 = -1;
   /* 116 */       int smj_value_5 = -1;
   /* 117 */       smj_value_4 = smj_streamedRow_0.getInt(0);
   /* 118 */       smj_value_5 = smj_streamedRow_0.getInt(1);
   /* 119 */       scala.collection.Iterator<UnsafeRow> smj_iterator_0 = smj_matches_0.generateIterator();
   /* 120 */
   /* 121 */       while (smj_iterator_0.hasNext()) {
   /* 122 */         InternalRow smj_bufferedRow_1 = (InternalRow) smj_iterator_0.next();
   //   Append output rows
   /* 137 */         append((smj_mutableStateArray_0[0].getRow()).copy());
   /* 138 */
   /* 139 */       }
   /* 140 */       if (shouldStop()) return;
   /* 141 */     }
   /* 142 */     ((org.apache.spark.sql.execution.joins.SortMergeJoinExec) references[1] /* plan */).cleanupResources();  // 2.3 SMJ call earlyCleanupResources() to free offHeap memory.
   /* 143 */   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42206: [SPARK-44582][SQL] Skip iterator on SMJ if it was cleaned up

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42206:
URL: https://github.com/apache/spark/pull/42206#issuecomment-1657327012

   cc @rednaxelafx @cloud-fan @bersprockets FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on pull request #42206: [SPARK-44582][SQL] Skip iterator on SMJ if it was cleaned up

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #42206:
URL: https://github.com/apache/spark/pull/42206#issuecomment-1661529867

   Hi, @bersprockets Thanks for the detailed explanation.
   
   The `WindowEvaluatorFactory` was recently extracted from `WindowExec` without any change of the main logic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42206: [SPARK-44582][SQL] Skip iterator on SMJ if it was cleaned up

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42206:
URL: https://github.com/apache/spark/pull/42206#issuecomment-1665029179

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] bersprockets commented on pull request #42206: [SPARK-44582][SQL] Skip iterator on SMJ if it was cleaned up

Posted by "bersprockets (via GitHub)" <gi...@apache.org>.
bersprockets commented on PR #42206:
URL: https://github.com/apache/spark/pull/42206#issuecomment-1664996034

   Thanks. Looks good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] bersprockets commented on pull request #42206: [SPARK-44582][SQL] Skip iterator on SMJ if it was cleaned up

Posted by "bersprockets (via GitHub)" <gi...@apache.org>.
bersprockets commented on PR #42206:
URL: https://github.com/apache/spark/pull/42206#issuecomment-1661030654

   While I have not yet looked at the solution provided here, I can confirm that the bug does indeed exist on the master branch.
   
   I was able to reproduce, but I had to have some debug statements for the timing to be right. My debug statements don't change logic or access any data structures (other than thread name), so it's not changing behavior, just timing. In more complex jobs with lots going on in the JVM, the timing might be right without debug statements.
   
   To get the SIGSEGV, the WindowExec iterator needs a `nextRow` value (called X in the PR description) whose internal fields are pointing to offheap memory held (indirectly) by `SortExec`. When the generated SMJ code calls `cleanUpResources` because the right side is empty, the offheap memory held (indirectly) by `SortExec` gets released.
   
   Later, when the `WindowExec` iterator calls `buffer.add(nextRow)`, the buffer tries to copy data from the cleaned up offheap page pointed at by `nextRow`. If you're lucky (which you usually are), the memory is still part of the JVM's process, so there's no crash. However, if that memory has been removed from the JVM's process, you get the SIGSEGV:
   ```
   bash-3.2$ bin/spark-sql --driver-memory 450m --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=3m --master "local[2]" --conf spark.sql.shuffle.partitions=2
   ...
   spark-sql (default)> create or replace temp view leftside as
   select a, b, c, sum(c) over(partition by a order by b range between 2 preceding and current row) as sumc
   from t1;
   Time taken: 1.692 seconds
   spark-sql (default)> create or replace temp view rightside as
   select a, b, c, sum(c) over(partition by a order by b range between 2 preceding and current row) as sumc
   from t2;
   Time taken: 0.082 seconds
   spark-sql (default)> set spark.sql.adaptive.enabled=false;
   spark.sql.adaptive.enabled	false
   Time taken: 0.11 seconds, Fetched 1 row(s)
   spark-sql (default)> set spark.sql.autoBroadcastJoinThreshold=-1;
   spark.sql.autoBroadcastJoinThreshold	-1
   Time taken: 0.018 seconds, Fetched 1 row(s)
   spark-sql (default)> create or replace temp view joined as
   select l.a, l.sumc, r.a as ra, r.sumc as rsumc
   from leftside l
   join rightside r
   on l.a = r.a;
   Time taken: 0.178 seconds
   spark-sql (default)> drop table if exists myoutput; create table myoutput stored as parquet as select * from joined;
   23/08/01 12:50:00 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
   Time taken: 0.571 seconds
   23/08/01 12:50:00 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
   23/08/01 12:50:01 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
   23/08/01 12:50:01 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
   23/08/01 12:50:01 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
   23/08/01 12:50:01 WARN HiveMetaStore: Location: file:/Users/bruce/github/spark_fork_smj_issue/spark-warehouse/myoutput specified for non-external table:myoutput
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) &&& sorter cleanupResources
   Executor task launch worker for task 0.0 in stage 2.0 (TID 2) &&& sorter cleanupResources
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) >>> In processRows
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) ^^^ WindowExec fetching next partition
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) %%% Adding row to buffer
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) ^^^ WindowExec getting next from buffer
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) +++ Getting next streamed row
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) +++ Returning false
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) >>> Performing eager cleanup
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) &&& sorter cleanupResources
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) &&& sorter cleanupResources
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) >>> In processRows
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) ^^^ WindowExec fetching next partition
   Executor task launch worker for task 1.0 in stage 2.0 (TID 3) %%% Adding row to buffer
   #
   # A fatal error has been detected by the Java Runtime Environment:
   #
   #  SIGSEGV (0xb) at pc=0x000000010f4c19a0, pid=96472, tid=52259
   #
   # JRE version: Java(TM) SE Runtime Environment 18.9 (11.0.12+8) (build 11.0.12+8-LTS-237)
   # Java VM: Java HotSpot(TM) 64-Bit Server VM 18.9 (11.0.12+8-LTS-237, mixed mode, tiered, compressed oops, g1 gc, bsd-amd64)
   # Problematic frame:
   # V  [libjvm.dylib+0x1329a0]  acl_CopyRight+0x29
   #
   ...
   ```
   The error report stack trace looks slightly different than that in the PR description, because I am running this on the master branch (which uses `WindowEvaluatorFactory`), but it dies in the same place (`ExternalAppendOnlyUnsafeRowArray.add`):
   ```
   Stack: [0x0000700010666000,0x0000700010766000],  sp=0x0000700010764308,  free space=1016k
   Native frames: (J=compiled Java code, A=aot compiled Java code, j=interpreted, Vv=VM code, C=native code)
   V  [libjvm.dylib+0x1329a0]  acl_CopyRight+0x29
   J 4228  jdk.internal.misc.Unsafe.copyMemory0(Ljava/lang/Object;JLjava/lang/Object;JJ)V java.base@11.0.12 (0 bytes) @ 0x00000001249c88b4 [0x00000001249c87c0+0x00000000000000f4]
   J 4226 c1 jdk.internal.misc.Unsafe.copyMemory(Ljava/lang/Object;JLjava/lang/Object;JJ)V java.base@11.0.12 (33 bytes) @ 0x000000011da6e92c [0x000000011da6e0e0+0x000000000000084c]
   j  sun.misc.Unsafe.copyMemory(Ljava/lang/Object;JLjava/lang/Object;JJ)V+11 jdk.unsupported@11.0.12
   j  org.apache.spark.unsafe.Platform.copyMemory(Ljava/lang/Object;JLjava/lang/Object;JJ)V+34
   j  org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy()Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;+37
   j  org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;)V+48
   j  org.apache.spark.sql.execution.window.WindowEvaluatorFactory$WindowPartitionEvaluator$$anon$1.fetchNextPartition()V+55
   j  org.apache.spark.sql.execution.window.WindowEvaluatorFactory$WindowPartitionEvaluator$$anon$1.next()Lorg/apache/spark/sql/catalyst/InternalRow;+61
   j  org.apache.spark.sql.execution.window.WindowEvaluatorFactory$WindowPartitionEvaluator$$anon$1.next()Ljava/lang/Object;+1
   j  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext()V+7
   j  org.apache.spark.sql.execution.BufferedRowIterator.hasNext()Z+11
   j  org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext()Z+4
   j  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.smj_findNextJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage7;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z+11
   j  org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext()V+412
   j  org.apache.spark.sql.execution.BufferedRowIterator.hasNext()Z+11
   j  org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext()Z+4
   j  org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(Lscala/collection/Iterator;)V+3
   j  org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(Lorg/apache/spark/sql/execution/datasources/FileFormatDataWriter;Lscala/collection/Iterator;)Lorg/apache/spark/sql/execution/datasources/WriteTaskResult;+2
   ...
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wankunde commented on pull request #42206: [SPARK-44582][SQL] Skip iterator on SMJ if it was cleaned up

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #42206:
URL: https://github.com/apache/spark/pull/42206#issuecomment-1663191874

   > It looks fine to me, except maybe check the code for left semi joins.
   > 
   > I could not make the crash happen with left semi joins. I think the bug might actually exist in that code (within the same task, I see a call to processRows _after_ eager cleanup). However, it seems that for left semi joins, the optimizer moves the `Window` after the `Join` (that is, the windowing is performed on the joined result), so there is no X row to copy.
   > 
   > By the way, there is a reason you see `processRows` called again even after `BufferedIterator.hasNext` returns false: `FileFormatWriter` calls `hasNext` to see if the iterator is empty. If it is, it instantiates an instance of `EmptyDirectoryDataWriter`, which also calls `hasNext`.
   
   Thanks for your review. Fix this issue for LeftSemi SMJ.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org