You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2022/04/26 13:48:06 UTC

[spark] branch master updated: [SPARK-38944][CORE][SQL] Close `UnsafeSorterSpillReader` before `SpillableArrayIterator` throw `ConcurrentModificationException`

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f1286f324c0 [SPARK-38944][CORE][SQL] Close `UnsafeSorterSpillReader` before `SpillableArrayIterator` throw `ConcurrentModificationException`
f1286f324c0 is described below

commit f1286f324c032f9a875167fdbb265b4d495752c9
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Tue Apr 26 08:47:54 2022 -0500

    [SPARK-38944][CORE][SQL] Close `UnsafeSorterSpillReader` before `SpillableArrayIterator` throw `ConcurrentModificationException`
    
    ### What changes were proposed in this pull request?
    There will be `UnsafeSorterSpillReader` resource leak(`InputStream` hold by `UnsafeSorterSpillReader` ) when `SpillableArrayIterator` throw `ConcurrentModificationException`, so this pr add resource cleanup process before `UnsafeSorterSpillReader` throws `ConcurrentModificationException`.
    
    ### Why are the changes needed?
    Fix file resource leak.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Pass GA
    - Add new check in `ExternalAppendOnlyUnsafeRowArraySuite`
    
    run command:
    
    ```
    mvn clean install -pl sql/core -am -DskipTests
    mvn clean test -pl sql/core -Dtest=none -DwildcardSuites=org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArraySuite
    ```
    
    **Before**
    
    ```
    - test iterator invalidation (with spill) *** FAILED ***
      org.apache.spark.io.ReadAheadInputStream478b0739 did not equal null (ExternalAppendOnlyUnsafeRowArraySuite.scala:397)
    Run completed in 9 seconds, 652 milliseconds.
    Total number of tests run: 14
    Suites: completed 2, aborted 0
    Tests: succeeded 13, failed 1, canceled 0, ignored 0, pending 0
    *** 1 TEST FAILED ***
    ```
    
    **After**
    
    ```
    Run completed in 8 seconds, 535 milliseconds.
    Total number of tests run: 14
    Suites: completed 2, aborted 0
    Tests: succeeded 14, failed 0, canceled 0, ignored 0, pending 0
    All tests passed.
    ```
    
    Closes #36262 from LuciferYang/SPARK-38944.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../unsafe/sort/UnsafeExternalSorter.java          | 24 ++++++++++++--
 .../ExternalAppendOnlyUnsafeRowArray.scala         | 11 +++++++
 .../ExternalAppendOnlyUnsafeRowArraySuite.scala    | 37 ++++++++++++++++++++++
 3 files changed, 70 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index c38327cae8c..d836cf3f0e3 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -18,6 +18,7 @@
 package org.apache.spark.util.collection.unsafe.sort;
 
 import javax.annotation.Nullable;
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.LinkedList;
@@ -25,13 +26,14 @@ import java.util.Queue;
 import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.spark.TaskContext;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.memory.TooLargePageException;
 import org.apache.spark.serializer.SerializerManager;
@@ -745,7 +747,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
   /**
    * Chain multiple UnsafeSorterIterator together as single one.
    */
-  static class ChainedIterator extends UnsafeSorterIterator {
+  static class ChainedIterator extends UnsafeSorterIterator implements Closeable {
 
     private final Queue<UnsafeSorterIterator> iterators;
     private UnsafeSorterIterator current;
@@ -798,5 +800,23 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
 
     @Override
     public long getKeyPrefix() { return current.getKeyPrefix(); }
+
+    @Override
+    public void close() throws IOException {
+      if (iterators != null && !iterators.isEmpty()) {
+        for (UnsafeSorterIterator iterator : iterators) {
+          closeIfPossible(iterator);
+        }
+      }
+      if (current != null) {
+        closeIfPossible(current);
+      }
+    }
+
+    private void closeIfPossible(UnsafeSorterIterator iterator) {
+      if (iterator instanceof Closeable) {
+        IOUtils.closeQuietly(((Closeable) iterator));
+      }
+    }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
index 2c9c91ec40b..4147d75186d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import java.io.Closeable
+
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkEnv, TaskContext}
@@ -192,10 +194,14 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
 
     protected def throwExceptionIfModified(): Unit = {
       if (expectedModificationsCount != modificationsCount) {
+        closeIfNeeded()
         throw QueryExecutionErrors.concurrentModificationOnExternalAppendOnlyUnsafeRowArrayError(
           classOf[ExternalAppendOnlyUnsafeRowArray].getName)
       }
     }
+
+    protected def closeIfNeeded(): Unit = {}
+
   }
 
   private[this] class InMemoryBufferIterator(startIndex: Int)
@@ -228,6 +234,11 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
       currentRow.pointTo(iterator.getBaseObject, iterator.getBaseOffset, iterator.getRecordLength)
       currentRow
     }
+
+    override protected def closeIfNeeded(): Unit = iterator match {
+      case c: Closeable => c.close()
+      case _ => // do nothing
+    }
   }
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
index 98aba3ba25f..f140d867481 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
@@ -17,13 +17,16 @@
 
 package org.apache.spark.sql.execution
 
+import java.util
 import java.util.ConcurrentModificationException
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark._
 import org.apache.spark.memory.MemoryTestingUtils
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeSorterIterator, UnsafeSorterSpillReader}
 
 class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSparkContext {
   private val random = new java.util.Random()
@@ -155,6 +158,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
 
       assert(!iterator1.hasNext)
       intercept[ConcurrentModificationException](iterator1.next())
+      checkIteratorClosedWhenThrowConcurrentModificationException(iterator1)
     }
   }
 
@@ -178,6 +182,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
 
       assert(!iterator1.hasNext)
       intercept[ConcurrentModificationException](iterator1.next())
+      checkIteratorClosedWhenThrowConcurrentModificationException(iterator1)
     }
   }
 
@@ -265,6 +270,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
       populateRows(array, 1)
       assert(!iterator.hasNext)
       intercept[ConcurrentModificationException](iterator.next())
+      checkIteratorClosedWhenThrowConcurrentModificationException(iterator)
 
       // Clearing the array should also invalidate any old iterators
       iterator = array.generateIterator()
@@ -274,6 +280,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
       array.clear()
       assert(!iterator.hasNext)
       intercept[ConcurrentModificationException](iterator.next())
+      checkIteratorClosedWhenThrowConcurrentModificationException(iterator)
     }
   }
 
@@ -292,6 +299,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
       populateRows(array, 1)
       assert(!iterator.hasNext)
       intercept[ConcurrentModificationException](iterator.next())
+      checkIteratorClosedWhenThrowConcurrentModificationException(iterator)
 
       // Clearing the array should also invalidate any old iterators
       iterator = array.generateIterator()
@@ -301,6 +309,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
       array.clear()
       assert(!iterator.hasNext)
       intercept[ConcurrentModificationException](iterator.next())
+      checkIteratorClosedWhenThrowConcurrentModificationException(iterator)
     }
   }
 
@@ -319,6 +328,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
       // Clearing an empty array should also invalidate any old iterators
       assert(!iterator.hasNext)
       intercept[ConcurrentModificationException](iterator.next())
+      checkIteratorClosedWhenThrowConcurrentModificationException(iterator)
     }
   }
 
@@ -372,4 +382,31 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
       assert(getNumBytesSpilled > bytesSpilled)
     }
   }
+
+  private def checkIteratorClosedWhenThrowConcurrentModificationException(
+      iterator: Iterator[UnsafeRow]): Unit = {
+    def getFieldValue(obj: Any, fieldName: String): Any = {
+      val field = obj.getClass.getDeclaredField(fieldName)
+      field.setAccessible(true)
+      field.get(obj)
+    }
+    def checkUnsafeSorterSpillReaderClosed(
+        unsafeSorterIterator: UnsafeSorterIterator): Unit = unsafeSorterIterator match {
+      case reader: UnsafeSorterSpillReader =>
+        // If UnsafeSorterSpillReader is not closed, `in` and `din` are not null
+        assert(getFieldValue(reader, "in") == null)
+        assert(getFieldValue(reader, "din") == null)
+      case _ => // do noting
+    }
+    // Only check `SpillableArrayIterator` because `InMemoryBufferIterator` not open the file handle
+    if (iterator.getClass.getSimpleName.equals("SpillableArrayIterator")) {
+      val chainedIterator = getFieldValue(iterator, "iterator")
+      val current = getFieldValue(chainedIterator, "current")
+      assert(current.isInstanceOf[UnsafeSorterIterator])
+      checkUnsafeSorterSpillReaderClosed(current.asInstanceOf[UnsafeSorterIterator])
+      val iterators = getFieldValue(chainedIterator, "iterators")
+      iterators.asInstanceOf[util.Queue[UnsafeSorterIterator]].asScala
+        .foreach(checkUnsafeSorterSpillReaderClosed)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org