You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by wu...@apache.org on 2021/07/23 15:19:41 UTC
[spark] branch branch-3.2 updated: [SPARK-36242][CORE] Ensure spill
file closed before set `success = true` in
`ExternalSorter.spillMemoryIteratorToDisk` method
This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new b46a9f3 [SPARK-36242][CORE] Ensure spill file closed before set `success = true` in `ExternalSorter.spillMemoryIteratorToDisk` method
b46a9f3 is described below
commit b46a9f3b0fb43fc08489ace0582032fb3a6fa866
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Fri Jul 23 23:15:13 2021 +0800
[SPARK-36242][CORE] Ensure spill file closed before set `success = true` in `ExternalSorter.spillMemoryIteratorToDisk` method
### What changes were proposed in this pull request?
The main change of this pr is move `writer.close()` before `success = true` to ensure spill file closed before set `success = true` in `ExternalSorter.spillMemoryIteratorToDisk` method.
### Why are the changes needed?
Avoid setting `success = true` first and then failure of close spill file
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass the Jenkins or GitHub Action
- Add a new Test case to check `The spill file should not exists if writer close fails`
Closes #33460 from LuciferYang/external-sorter-spill-close.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: yi.wu <yi...@databricks.com>
(cherry picked from commit f61d5993eafe024effd3e0c4c17bd9779c704073)
Signed-off-by: yi.wu <yi...@databricks.com>
---
.../spark/util/collection/ExternalSorter.scala | 5 +-
.../util/collection/ExternalSorterSpillSuite.scala | 147 +++++++++++++++++++++
2 files changed, 149 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index dba9e74..c63e196 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -313,14 +313,13 @@ private[spark] class ExternalSorter[K, V, C](
}
if (objectsWritten > 0) {
flush()
+ writer.close()
} else {
writer.revertPartialWritesAndClose()
}
success = true
} finally {
- if (success) {
- writer.close()
- } else {
+ if (!success) {
// This code path only happens if an exception was thrown above before we set success;
// close our stuff and let the exception be thrown further
writer.revertPartialWritesAndClose()
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSpillSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSpillSuite.scala
new file mode 100644
index 0000000..959d5d8
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSpillSuite.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.io.{File, IOException}
+import java.util.UUID
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.mockito.ArgumentMatchers.{any, anyInt}
+import org.mockito.Mockito.{mock, when}
+import org.mockito.invocation.InvocationOnMock
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite, TaskContext}
+import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.internal.config
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
+import org.apache.spark.serializer.{KryoSerializer, SerializerInstance, SerializerManager}
+import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter, TempShuffleBlockId}
+import org.apache.spark.util.{Utils => UUtils}
+
+class ExternalSorterSpillSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+ private val spillFilesCreated = ArrayBuffer.empty[File]
+
+ private var tempDir: File = _
+ private var conf: SparkConf = _
+ private var taskMemoryManager: TaskMemoryManager = _
+
+ private var blockManager: BlockManager = _
+ private var diskBlockManager: DiskBlockManager = _
+ private var taskContext: TaskContext = _
+
+ override protected def beforeEach(): Unit = {
+ tempDir = UUtils.createTempDir(null, "test")
+ spillFilesCreated.clear()
+
+ val env: SparkEnv = mock(classOf[SparkEnv])
+ SparkEnv.set(env)
+
+ conf = new SparkConf()
+ when(SparkEnv.get.conf).thenReturn(conf)
+
+ val serializer = new KryoSerializer(conf)
+ when(SparkEnv.get.serializer).thenReturn(serializer)
+
+ blockManager = mock(classOf[BlockManager])
+ when(SparkEnv.get.blockManager).thenReturn(blockManager)
+
+ val manager = new SerializerManager(serializer, conf)
+ when(blockManager.serializerManager).thenReturn(manager)
+
+ diskBlockManager = mock(classOf[DiskBlockManager])
+ when(blockManager.diskBlockManager).thenReturn(diskBlockManager)
+
+ taskContext = mock(classOf[TaskContext])
+ val memoryManager = new TestMemoryManager(conf)
+ taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
+ when(taskContext.taskMemoryManager()).thenReturn(taskMemoryManager)
+
+ when(diskBlockManager.createTempShuffleBlock())
+ .thenAnswer((_: InvocationOnMock) => {
+ val blockId = TempShuffleBlockId(UUID.randomUUID)
+ val file = File.createTempFile("spillFile", ".spill", tempDir)
+ spillFilesCreated += file
+ (blockId, file)
+ })
+ }
+
+ override protected def afterEach(): Unit = {
+ UUtils.deleteRecursively(tempDir)
+ SparkEnv.set(null)
+
+ val leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory
+ if (leakedMemory != 0) {
+ fail("Test leaked " + leakedMemory + " bytes of managed memory")
+ }
+ }
+
+ test("SPARK-36242 Spill File should not exists if writer close fails") {
+ // Prepare the data and ensure that the amount of data let the `spill()` method
+ // to enter the `objectsWritten > 0` branch
+ val writeSize = conf.get(config.SHUFFLE_SPILL_BATCH_SIZE) + 1
+ val dataBuffer = new PartitionedPairBuffer[Int, Int]
+ (0 until writeSize.toInt).foreach(i => dataBuffer.insert(0, 0, i))
+
+ val externalSorter = new TestExternalSorter[Int, Int, Int](taskContext)
+
+ // Mock the answer of `blockManager.getDiskWriter` and let the `close()` method of
+ // `DiskBlockObjectWriter` throw IOException.
+ val errorMessage = "Spill file close failed"
+ when(blockManager.getDiskWriter(
+ any(classOf[BlockId]),
+ any(classOf[File]),
+ any(classOf[SerializerInstance]),
+ anyInt(),
+ any(classOf[ShuffleWriteMetrics])
+ )).thenAnswer((invocation: InvocationOnMock) => {
+ val args = invocation.getArguments
+ new DiskBlockObjectWriter(
+ args(1).asInstanceOf[File],
+ blockManager.serializerManager,
+ args(2).asInstanceOf[SerializerInstance],
+ args(3).asInstanceOf[Int],
+ false,
+ args(4).asInstanceOf[ShuffleWriteMetrics],
+ args(0).asInstanceOf[BlockId]
+ ) {
+ override def close(): Unit = throw new IOException(errorMessage)
+ }
+ })
+
+ val ioe = intercept[IOException] {
+ externalSorter.spill(dataBuffer)
+ }
+
+ ioe.getMessage.equals(errorMessage)
+ // The `TempShuffleBlock` create by diskBlockManager
+ // will remain before SPARK-36242
+ assert(!spillFilesCreated(0).exists())
+ }
+}
+
+/**
+ * `TestExternalSorter` used to expand the access scope of the spill method.
+ */
+private[this] class TestExternalSorter[K, V, C](context: TaskContext)
+ extends ExternalSorter[K, V, C](context) {
+ override def spill(collection: WritablePartitionedPairCollection[K, C]): Unit =
+ super.spill(collection)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org