You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/02/28 11:20:23 UTC

[GitHub] [spark] cxzl25 commented on a change in pull request #10989: [SPARK-12798] [SQL] generated BroadcastHashJoin

cxzl25 commented on a change in pull request #10989: [SPARK-12798] [SQL] generated BroadcastHashJoin
URL: https://github.com/apache/spark/pull/10989#discussion_r385642305
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java
 ##########
 @@ -31,22 +33,20 @@
  * TODO: replaced it by batched columnar format.
  */
 public class BufferedRowIterator {
-  protected InternalRow currentRow;
+  protected LinkedList<InternalRow> currentRows = new LinkedList<>();
 
 Review comment:
   @cloud-fan @mgaido91 @viirya
   When ```spark.sql.codegen.wholeStage = true```, some joins caused OOM, analyzed the dump file, and found that ```BufferedRowIterator#currentRows``` holds all matching rows.
   If codegen is turned off, it runs just fine, only one matching row is generated each time.
   Increasing the executor memory may run successfully, but there is always a probability of failure, because it is not known how many rows of the current key match.
   
   example:
   ```scala
       val value = "x" * 1000 * 1000
       case class TestData(key: Int, value: String)
       val testData = spark.sparkContext.parallelize((1 to 1)
         .map(i => TestData(i, value))).toDF()
       var bigData = testData
       for (_ <- Range(0, 10)) {
         bigData = bigData.union(bigData)
       }
       val testDataX = testData.as("x").selectExpr("key as xkey", "value as xvalue")
       val bigDataY = bigData.as("y").selectExpr("key as ykey", "value as yvalue")
       testDataX.join(bigDataY).where("xkey = ykey").write.saveAsTable("test")
   ```
   hprof:
   ![image](https://user-images.githubusercontent.com/3898450/75543711-6f100280-5a5d-11ea-969b-bad680af4edc.png)
   
   currently generated code snippet:
   ```java
   protected void processNext() throws java.io.IOException {
       while (findNextInnerJoinRows(smj_leftInput_0, smj_rightInput_0)) {
         scala.collection.Iterator<UnsafeRow> smj_iterator_0 = smj_matches_0.generateIterator();
         while (smj_iterator_0.hasNext()) {
           InternalRow smj_rightRow_1 = (InternalRow) smj_iterator_0.next();
           append(xxRow.copy());
         }
         if (shouldStop()) return;
       }
   }
   ```
   
   Is it possible to change to code like this, or is there any other better way?
   ```java
   private scala.collection.Iterator<UnsafeRow> smj_iterator_0;
   protected void processNext() throws java.io.IOException {
       if(smj_iterator_0 != null & smj_iterator_0.hasNext) {
           append(xxRow.getRow().copy());
           if(smj_iterator_0.hasNext) {
               smj_iterator_0 = null;
           }
           return;
       }
       while (findNextInnerJoinRows(smj_leftInput_0, smj_rightInput_0)) {
         smj_iterator_0 = smj_matches_0.generateIterator();
         if (smj_iterator_0.hasNext()) {
           append(xxRow.getRow().copy());
           if(smj_iterator_0.hasNext) {
               smj_iterator_0 = null;
           }
           return;
         }
         if (shouldStop()) return;
       }
   }
   ```
   
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org