You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2022/04/26 13:48:06 UTC
[spark] branch master updated: [SPARK-38944][CORE][SQL] Close `UnsafeSorterSpillReader` before `SpillableArrayIterator` throw `ConcurrentModificationException`
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f1286f324c0 [SPARK-38944][CORE][SQL] Close `UnsafeSorterSpillReader` before `SpillableArrayIterator` throw `ConcurrentModificationException`
f1286f324c0 is described below
commit f1286f324c032f9a875167fdbb265b4d495752c9
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Tue Apr 26 08:47:54 2022 -0500
[SPARK-38944][CORE][SQL] Close `UnsafeSorterSpillReader` before `SpillableArrayIterator` throw `ConcurrentModificationException`
### What changes were proposed in this pull request?
There will be `UnsafeSorterSpillReader` resource leak(`InputStream` hold by `UnsafeSorterSpillReader` ) when `SpillableArrayIterator` throw `ConcurrentModificationException`, so this pr add resource cleanup process before `UnsafeSorterSpillReader` throws `ConcurrentModificationException`.
### Why are the changes needed?
Fix file resource leak.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Pass GA
- Add new check in `ExternalAppendOnlyUnsafeRowArraySuite`
run command:
```
mvn clean install -pl sql/core -am -DskipTests
mvn clean test -pl sql/core -Dtest=none -DwildcardSuites=org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArraySuite
```
**Before**
```
- test iterator invalidation (with spill) *** FAILED ***
org.apache.spark.io.ReadAheadInputStream478b0739 did not equal null (ExternalAppendOnlyUnsafeRowArraySuite.scala:397)
Run completed in 9 seconds, 652 milliseconds.
Total number of tests run: 14
Suites: completed 2, aborted 0
Tests: succeeded 13, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***
```
**After**
```
Run completed in 8 seconds, 535 milliseconds.
Total number of tests run: 14
Suites: completed 2, aborted 0
Tests: succeeded 14, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```
Closes #36262 from LuciferYang/SPARK-38944.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: Sean Owen <sr...@gmail.com>
---
.../unsafe/sort/UnsafeExternalSorter.java | 24 ++++++++++++--
.../ExternalAppendOnlyUnsafeRowArray.scala | 11 +++++++
.../ExternalAppendOnlyUnsafeRowArraySuite.scala | 37 ++++++++++++++++++++++
3 files changed, 70 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index c38327cae8c..d836cf3f0e3 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -18,6 +18,7 @@
package org.apache.spark.util.collection.unsafe.sort;
import javax.annotation.Nullable;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
@@ -25,13 +26,14 @@ import java.util.Queue;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.TooLargePageException;
import org.apache.spark.serializer.SerializerManager;
@@ -745,7 +747,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
/**
* Chain multiple UnsafeSorterIterator together as single one.
*/
- static class ChainedIterator extends UnsafeSorterIterator {
+ static class ChainedIterator extends UnsafeSorterIterator implements Closeable {
private final Queue<UnsafeSorterIterator> iterators;
private UnsafeSorterIterator current;
@@ -798,5 +800,23 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
@Override
public long getKeyPrefix() { return current.getKeyPrefix(); }
+
+ @Override
+ public void close() throws IOException {
+ if (iterators != null && !iterators.isEmpty()) {
+ for (UnsafeSorterIterator iterator : iterators) {
+ closeIfPossible(iterator);
+ }
+ }
+ if (current != null) {
+ closeIfPossible(current);
+ }
+ }
+
+ private void closeIfPossible(UnsafeSorterIterator iterator) {
+ if (iterator instanceof Closeable) {
+ IOUtils.closeQuietly(((Closeable) iterator));
+ }
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
index 2c9c91ec40b..4147d75186d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution
+import java.io.Closeable
+
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkEnv, TaskContext}
@@ -192,10 +194,14 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
protected def throwExceptionIfModified(): Unit = {
if (expectedModificationsCount != modificationsCount) {
+ closeIfNeeded()
throw QueryExecutionErrors.concurrentModificationOnExternalAppendOnlyUnsafeRowArrayError(
classOf[ExternalAppendOnlyUnsafeRowArray].getName)
}
}
+
+ protected def closeIfNeeded(): Unit = {}
+
}
private[this] class InMemoryBufferIterator(startIndex: Int)
@@ -228,6 +234,11 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
currentRow.pointTo(iterator.getBaseObject, iterator.getBaseOffset, iterator.getRecordLength)
currentRow
}
+
+ override protected def closeIfNeeded(): Unit = iterator match {
+ case c: Closeable => c.close()
+ case _ => // do nothing
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
index 98aba3ba25f..f140d867481 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
@@ -17,13 +17,16 @@
package org.apache.spark.sql.execution
+import java.util
import java.util.ConcurrentModificationException
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeSorterIterator, UnsafeSorterSpillReader}
class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSparkContext {
private val random = new java.util.Random()
@@ -155,6 +158,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
assert(!iterator1.hasNext)
intercept[ConcurrentModificationException](iterator1.next())
+ checkIteratorClosedWhenThrowConcurrentModificationException(iterator1)
}
}
@@ -178,6 +182,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
assert(!iterator1.hasNext)
intercept[ConcurrentModificationException](iterator1.next())
+ checkIteratorClosedWhenThrowConcurrentModificationException(iterator1)
}
}
@@ -265,6 +270,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
populateRows(array, 1)
assert(!iterator.hasNext)
intercept[ConcurrentModificationException](iterator.next())
+ checkIteratorClosedWhenThrowConcurrentModificationException(iterator)
// Clearing the array should also invalidate any old iterators
iterator = array.generateIterator()
@@ -274,6 +280,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
array.clear()
assert(!iterator.hasNext)
intercept[ConcurrentModificationException](iterator.next())
+ checkIteratorClosedWhenThrowConcurrentModificationException(iterator)
}
}
@@ -292,6 +299,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
populateRows(array, 1)
assert(!iterator.hasNext)
intercept[ConcurrentModificationException](iterator.next())
+ checkIteratorClosedWhenThrowConcurrentModificationException(iterator)
// Clearing the array should also invalidate any old iterators
iterator = array.generateIterator()
@@ -301,6 +309,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
array.clear()
assert(!iterator.hasNext)
intercept[ConcurrentModificationException](iterator.next())
+ checkIteratorClosedWhenThrowConcurrentModificationException(iterator)
}
}
@@ -319,6 +328,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
// Clearing an empty array should also invalidate any old iterators
assert(!iterator.hasNext)
intercept[ConcurrentModificationException](iterator.next())
+ checkIteratorClosedWhenThrowConcurrentModificationException(iterator)
}
}
@@ -372,4 +382,31 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
assert(getNumBytesSpilled > bytesSpilled)
}
}
+
+ private def checkIteratorClosedWhenThrowConcurrentModificationException(
+ iterator: Iterator[UnsafeRow]): Unit = {
+ def getFieldValue(obj: Any, fieldName: String): Any = {
+ val field = obj.getClass.getDeclaredField(fieldName)
+ field.setAccessible(true)
+ field.get(obj)
+ }
+ def checkUnsafeSorterSpillReaderClosed(
+ unsafeSorterIterator: UnsafeSorterIterator): Unit = unsafeSorterIterator match {
+ case reader: UnsafeSorterSpillReader =>
+ // If UnsafeSorterSpillReader is not closed, `in` and `din` are not null
+ assert(getFieldValue(reader, "in") == null)
+ assert(getFieldValue(reader, "din") == null)
+ case _ => // do noting
+ }
+ // Only check `SpillableArrayIterator` because `InMemoryBufferIterator` not open the file handle
+ if (iterator.getClass.getSimpleName.equals("SpillableArrayIterator")) {
+ val chainedIterator = getFieldValue(iterator, "iterator")
+ val current = getFieldValue(chainedIterator, "current")
+ assert(current.isInstanceOf[UnsafeSorterIterator])
+ checkUnsafeSorterSpillReaderClosed(current.asInstanceOf[UnsafeSorterIterator])
+ val iterators = getFieldValue(chainedIterator, "iterators")
+ iterators.asInstanceOf[util.Queue[UnsafeSorterIterator]].asScala
+ .foreach(checkUnsafeSorterSpillReaderClosed)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org