You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "wankunde (via GitHub)" <gi...@apache.org> on 2023/07/28 11:22:33 UTC

[GitHub] [spark] wankunde commented on pull request #42206: [SPARK-44582] Skip iterator on SMJ if it was cleaned up

wankunde commented on PR #42206:
URL: https://github.com/apache/spark/pull/42206#issuecomment-1655520091

   Add the inner SMJ gen-code for better understand.
   
   ```java
   /* 036 */   private boolean smj_findNextJoinRows_0(
   /* 037 */     scala.collection.Iterator streamedIter,
   /* 038 */     scala.collection.Iterator bufferedIter) {
   /* 039 */     smj_streamedRow_0 = null;
   /* 040 */     int comp = 0;
   /* 041 */     while (smj_streamedRow_0 == null) {
   /* 042 */       if (!streamedIter.hasNext()) return false;  // 2.1.2 Window 1 will read one group data and the first row in next group (named X), return the first row in the first group.
   /* 043 */       smj_streamedRow_0 = (InternalRow) streamedIter.next();
   /* 044 */       int smj_value_0 = smj_streamedRow_0.getInt(0);
   /* 045 */       if (false) {
   /* 046 */         smj_streamedRow_0 = null;
   /* 047 */         continue;
   /* 048 */
   /* 049 */       }
   /* 050 */       if (!smj_matches_0.isEmpty()) {
   /* 051 */         comp = 0;
   /* 052 */         if (comp == 0) {
   /* 053 */           comp = (smj_value_0 > smj_value_3 ? 1 : smj_value_0 < smj_value_3 ? -1 : 0);
   /* 054 */         }
   /* 055 */
   /* 056 */         if (comp == 0) {
   /* 057 */           return true;
   /* 058 */         }
   /* 059 */         smj_matches_0.clear();
   /* 060 */       }
   /* 061 */
   /* 062 */       do {
   /* 063 */         if (smj_bufferedRow_0 == null) {
   /* 064 */           if (!bufferedIter.hasNext()) {
   /* 065 */             smj_value_3 = smj_value_0;
   /* 066 */             return !smj_matches_0.isEmpty();  // 2.2.1 Sort 2 and Window 2 are empty, no matched rows, and SMJ will finish. 
   /* 067 */           }
   /* 068 */           smj_bufferedRow_0 = (InternalRow) bufferedIter.next();
   /* 069 */           int smj_value_1 = smj_bufferedRow_0.getInt(0);
   /* 070 */           if (false) {
   /* 071 */             smj_bufferedRow_0 = null;
   /* 072 */             continue;
   /* 073 */           }
   /* 074 */           smj_value_2 = smj_value_1;
   /* 075 */         }
   /* 076 */
   /* 077 */         comp = 0;
   /* 078 */         if (comp == 0) {
   /* 079 */           comp = (smj_value_0 > smj_value_2 ? 1 : smj_value_0 < smj_value_2 ? -1 : 0);
   /* 080 */         }
   /* 081 */
   /* 082 */         if (comp > 0) {
   /* 083 */           smj_bufferedRow_0 = null;
   /* 084 */         } else if (comp < 0) {
   /* 085 */           if (!smj_matches_0.isEmpty()) {
   /* 086 */             smj_value_3 = smj_value_0;
   /* 087 */             return true;
   /* 088 */           } else {
   /* 089 */             smj_streamedRow_0 = null;
   /* 090 */           }
   /* 091 */         } else {
   /* 092 */           smj_matches_0.add((UnsafeRow) smj_bufferedRow_0);
   /* 093 */           smj_bufferedRow_0 = null;
   /* 094 */         }
   /* 095 */       } while (smj_streamedRow_0 != null);
   /* 096 */     }
   /* 097 */     return false; // unreachable
   /* 098 */   }
   /* 099 */
   /* 100 */   protected void processNext() throws java.io.IOException {
   /* 101 */     if (!wholestagecodegen_initJoin_0) {
   /* 102 */       wholestagecodegen_initJoin_0 = true;
   /* 113 */
   /* 114 */     while (smj_findNextJoinRows_0(smj_streamedInput_0, smj_bufferedInput_0)) {
   /* 115 */       int smj_value_4 = -1;
   /* 116 */       int smj_value_5 = -1;
   /* 117 */       smj_value_4 = smj_streamedRow_0.getInt(0);
   /* 118 */       smj_value_5 = smj_streamedRow_0.getInt(1);
   /* 119 */       scala.collection.Iterator<UnsafeRow> smj_iterator_0 = smj_matches_0.generateIterator();
   /* 120 */
   /* 121 */       while (smj_iterator_0.hasNext()) {
   /* 122 */         InternalRow smj_bufferedRow_1 = (InternalRow) smj_iterator_0.next();
   //   Append output rows
   /* 137 */         append((smj_mutableStateArray_0[0].getRow()).copy());
   /* 138 */
   /* 139 */       }
   /* 140 */       if (shouldStop()) return;
   /* 141 */     }
   /* 142 */     ((org.apache.spark.sql.execution.joins.SortMergeJoinExec) references[1] /* plan */).cleanupResources();  // 2.3 SMJ call earlyCleanupResources() to free offHeap memory.
   /* 143 */   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org