You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2016/11/07 20:47:57 UTC

spark git commit: [SPARK-14914][CORE] Fix Resource not closed after using, mostly for unit tests

Repository: spark
Updated Branches:
  refs/heads/master 0d95662e7 -> 8f0ea011a


[SPARK-14914][CORE] Fix Resource not closed after using, mostly for unit tests

## What changes were proposed in this pull request?

Close `FileStreams`, `ZipFiles` etc to release the resources after using. Not closing the resources will cause IO Exception to be raised while deleting temp files.
## How was this patch tested?

Existing tests

Author: U-FAREAST\tl <tl...@microsoft.com>
Author: hyukjinkwon <gu...@gmail.com>
Author: Tao LI <tl...@microsoft.com>

Closes #15618 from HyukjinKwon/SPARK-14914-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f0ea011
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f0ea011
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f0ea011

Branch: refs/heads/master
Commit: 8f0ea011a7294679ec4275b2fef349ef45b6eb81
Parents: 0d95662
Author: Hyukjin Kwon <gu...@gmail.com>
Authored: Mon Nov 7 12:47:39 2016 -0800
Committer: Mridul Muralidharan <mr...@gmail.com>
Committed: Mon Nov 7 12:47:39 2016 -0800

----------------------------------------------------------------------
 .../spark/rdd/ReliableCheckpointRDD.scala       | 13 +++++---
 .../test/scala/org/apache/spark/FileSuite.scala | 13 +++++---
 .../spark/deploy/RPackageUtilsSuite.scala       | 35 ++++++++++++--------
 .../deploy/history/FsHistoryProviderSuite.scala |  8 ++++-
 .../scheduler/EventLoggingListenerSuite.scala   | 26 +++++++++------
 .../spark/scheduler/TaskResultGetterSuite.scala |  7 ++--
 .../apache/spark/mllib/util/MLUtilsSuite.scala  | 16 +++++----
 .../apache/spark/streaming/JavaAPISuite.java    |  1 +
 .../spark/streaming/CheckpointSuite.scala       | 16 +++++----
 .../spark/streaming/MasterFailureTest.scala     |  1 +
 .../streaming/ReceivedBlockTrackerSuite.scala   |  5 +++
 .../streaming/util/WriteAheadLogSuite.scala     |  1 +
 12 files changed, 93 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index eac901d..9f800e3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -239,12 +239,17 @@ private[spark] object ReliableCheckpointRDD extends Logging {
       val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
       val fileInputStream = fs.open(partitionerFilePath, bufferSize)
       val serializer = SparkEnv.get.serializer.newInstance()
-      val deserializeStream = serializer.deserializeStream(fileInputStream)
-      val partitioner = Utils.tryWithSafeFinally[Partitioner] {
-        deserializeStream.readObject[Partitioner]
+      val partitioner = Utils.tryWithSafeFinally {
+        val deserializeStream = serializer.deserializeStream(fileInputStream)
+        Utils.tryWithSafeFinally {
+          deserializeStream.readObject[Partitioner]
+        } {
+          deserializeStream.close()
+        }
       } {
-        deserializeStream.close()
+        fileInputStream.close()
       }
+
       logDebug(s"Read partitioner from $partitionerFilePath")
       Some(partitioner)
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index cc52bb1..89f0b1c 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -58,10 +58,15 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
     nums.saveAsTextFile(outputDir)
     // Read the plain text file and check it's OK
     val outputFile = new File(outputDir, "part-00000")
-    val content = Source.fromFile(outputFile).mkString
-    assert(content === "1\n2\n3\n4\n")
-    // Also try reading it in as a text file RDD
-    assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
+    val bufferSrc = Source.fromFile(outputFile)
+    Utils.tryWithSafeFinally {
+      val content = bufferSrc.mkString
+      assert(content === "1\n2\n3\n4\n")
+      // Also try reading it in as a text file RDD
+      assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
+    } {
+      bufferSrc.close()
+    }
   }
 
   test("text files (compressed)") {

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
index 13cba94..0055870 100644
--- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
@@ -33,7 +33,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.api.r.RUtils
 import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{ResetSystemProperties, Utils}
 
 class RPackageUtilsSuite
   extends SparkFunSuite
@@ -74,9 +74,13 @@ class RPackageUtilsSuite
     val deps = Seq(dep1, dep2).mkString(",")
     IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
       val jars = Seq(main, dep1, dep2).map(c => new JarFile(getJarPath(c, new File(new URI(repo)))))
-      assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
-      assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
-      assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
+      Utils.tryWithSafeFinally {
+        assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
+        assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
+        assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
+      } {
+        jars.foreach(_.close())
+      }
     }
   }
 
@@ -131,7 +135,7 @@ class RPackageUtilsSuite
 
   test("SparkR zipping works properly") {
     val tempDir = Files.createTempDir()
-    try {
+    Utils.tryWithSafeFinally {
       IvyTestUtils.writeFile(tempDir, "test.R", "abc")
       val fakeSparkRDir = new File(tempDir, "SparkR")
       assert(fakeSparkRDir.mkdirs())
@@ -144,14 +148,19 @@ class RPackageUtilsSuite
       IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
       val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
       assert(finalZip.exists())
-      val entries = new ZipFile(finalZip).entries().asScala.map(_.getName).toSeq
-      assert(entries.contains("/test.R"))
-      assert(entries.contains("/SparkR/abc.R"))
-      assert(entries.contains("/SparkR/DESCRIPTION"))
-      assert(!entries.contains("/package.zip"))
-      assert(entries.contains("/packageTest/def.R"))
-      assert(entries.contains("/packageTest/DESCRIPTION"))
-    } finally {
+      val zipFile = new ZipFile(finalZip)
+      Utils.tryWithSafeFinally {
+        val entries = zipFile.entries().asScala.map(_.getName).toSeq
+        assert(entries.contains("/test.R"))
+        assert(entries.contains("/SparkR/abc.R"))
+        assert(entries.contains("/SparkR/DESCRIPTION"))
+        assert(!entries.contains("/package.zip"))
+        assert(entries.contains("/packageTest/def.R"))
+        assert(entries.contains("/packageTest/DESCRIPTION"))
+      } {
+        zipFile.close()
+      }
+    } {
       FileUtils.deleteDirectory(tempDir)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index a5eda7b..2c41c43 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -449,8 +449,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
     val bstream = new BufferedOutputStream(cstream)
     if (isNewFormat) {
-      EventLoggingListener.initEventLog(new FileOutputStream(file))
+      val newFormatStream = new FileOutputStream(file)
+      Utils.tryWithSafeFinally {
+        EventLoggingListener.initEventLog(newFormatStream)
+      } {
+        newFormatStream.close()
+      }
     }
+
     val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8)
     Utils.tryWithSafeFinally {
       events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 7f48592..8a5ec37 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -202,8 +202,6 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
 
     // Make sure expected events exist in the log file.
     val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
-    val logStart = SparkListenerLogStart(SPARK_VERSION)
-    val lines = readLines(logData)
     val eventSet = mutable.Set(
       SparkListenerApplicationStart,
       SparkListenerBlockManagerAdded,
@@ -216,19 +214,25 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
       SparkListenerTaskStart,
       SparkListenerTaskEnd,
       SparkListenerApplicationEnd).map(Utils.getFormattedClassName)
-    lines.foreach { line =>
-      eventSet.foreach { event =>
-        if (line.contains(event)) {
-          val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
-          val eventType = Utils.getFormattedClassName(parsedEvent)
-          if (eventType == event) {
-            eventSet.remove(event)
+    Utils.tryWithSafeFinally {
+      val logStart = SparkListenerLogStart(SPARK_VERSION)
+      val lines = readLines(logData)
+      lines.foreach { line =>
+        eventSet.foreach { event =>
+          if (line.contains(event)) {
+            val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
+            val eventType = Utils.getFormattedClassName(parsedEvent)
+            if (eventType == event) {
+              eventSet.remove(event)
+            }
           }
         }
       }
+      assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+      assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
+    } {
+      logData.close()
     }
-    assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
-    assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
   }
 
   private def readLines(in: InputStream): Seq[String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 9e472f9..ee95e4f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -183,9 +183,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
 
     // ensure we reset the classloader after the test completes
     val originalClassLoader = Thread.currentThread.getContextClassLoader
-    try {
+    val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
+    Utils.tryWithSafeFinally {
       // load the exception from the jar
-      val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
       loader.addURL(jarFile.toURI.toURL)
       Thread.currentThread().setContextClassLoader(loader)
       val excClass: Class[_] = Utils.classForName("repro.MyException")
@@ -209,8 +209,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
 
       assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined)
       assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty)
-    } finally {
+    } {
       Thread.currentThread.setContextClassLoader(originalClassLoader)
+      loader.close()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index e4e9be3..665708a 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -155,13 +155,17 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
     val tempDir = Utils.createTempDir()
     val outputDir = new File(tempDir, "output")
     MLUtils.saveAsLibSVMFile(examples, outputDir.toURI.toString)
-    val lines = outputDir.listFiles()
+    val sources = outputDir.listFiles()
       .filter(_.getName.startsWith("part-"))
-      .flatMap(Source.fromFile(_).getLines())
-      .toSet
-    val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03")
-    assert(lines === expected)
-    Utils.deleteRecursively(tempDir)
+      .map(Source.fromFile)
+    Utils.tryWithSafeFinally {
+      val lines = sources.flatMap(_.getLines()).toSet
+      val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03")
+      assert(lines === expected)
+    } {
+      sources.foreach(_.close())
+      Utils.deleteRecursively(tempDir)
+    }
   }
 
   test("appendBias") {

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 3d54abd..648a5ab 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1805,6 +1805,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     // will be re-processed after recovery
     List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
     assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
+    ssc.stop();
     Utils.deleteRecursively(tempDir);
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index b79cc65..41f16bf 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -642,16 +642,18 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
         val fileStream = ssc.textFileStream(testDir.toString)
         // Make value 3 take a large time to process, to ensure that the driver
         // shuts down in the middle of processing the 3rd batch
-        CheckpointSuite.batchThreeShouldBlockIndefinitely = true
-        val mappedStream = fileStream.map(s => {
+        CheckpointSuite.batchThreeShouldBlockALongTime = true
+        val mappedStream = fileStream.map { s =>
           val i = s.toInt
           if (i == 3) {
-            while (CheckpointSuite.batchThreeShouldBlockIndefinitely) {
-              Thread.sleep(Long.MaxValue)
+            if (CheckpointSuite.batchThreeShouldBlockALongTime) {
+              // It's not a good idea to let the thread run forever
+              // as resource won't be correctly released
+              Thread.sleep(6000)
             }
           }
           i
-        })
+        }
 
         // Reducing over a large window to ensure that recovery from driver failure
         // requires reprocessing of all the files seen before the failure
@@ -691,7 +693,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
       }
 
       // The original StreamingContext has now been stopped.
-      CheckpointSuite.batchThreeShouldBlockIndefinitely = false
+      CheckpointSuite.batchThreeShouldBlockALongTime = false
 
       // Create files while the streaming driver is down
       for (i <- Seq(4, 5, 6)) {
@@ -928,5 +930,5 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
 }
 
 private object CheckpointSuite extends Serializable {
-  var batchThreeShouldBlockIndefinitely: Boolean = true
+  var batchThreeShouldBlockALongTime: Boolean = true
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 60c8e70..fff2d6f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -164,6 +164,7 @@ object MasterFailureTest extends Logging {
     val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun)
 
     fileGeneratingThread.join()
+    ssc.stop()
     fs.delete(checkpointDir, true)
     fs.delete(testDir, true)
     logInfo("Finished test after " + killCount + " failures")

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 851013b..107c3f5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -134,6 +134,7 @@ class ReceivedBlockTrackerSuite
     val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent)
     getWrittenLogData() shouldEqual expectedWrittenData1
     getWriteAheadLogFiles() should have size 1
+    tracker1.stop()
 
     incrementTime()
 
@@ -141,6 +142,7 @@ class ReceivedBlockTrackerSuite
     val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false)
     tracker1_.getUnallocatedBlocks(streamId) shouldBe empty
     tracker1_.hasUnallocatedReceivedBlocks should be (false)
+    tracker1_.stop()
 
     // Restart tracker and verify recovered list of unallocated blocks
     val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
@@ -163,6 +165,7 @@ class ReceivedBlockTrackerSuite
     val blockInfos2 = addBlockInfos(tracker2)
     tracker2.allocateBlocksToBatch(batchTime2)
     tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
+    tracker2.stop()
 
     // Verify whether log has correct contents
     val expectedWrittenData2 = expectedWrittenData1 ++
@@ -192,6 +195,7 @@ class ReceivedBlockTrackerSuite
       getWriteAheadLogFiles() should not contain oldestLogFile
     }
     printLogFiles("After clean")
+    tracker3.stop()
 
     // Restart tracker and verify recovered state, specifically whether info about the first
     // batch has been removed, but not the second batch
@@ -200,6 +204,7 @@ class ReceivedBlockTrackerSuite
     tracker4.getUnallocatedBlocks(streamId) shouldBe empty
     tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty  // should be cleaned
     tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
+    tracker4.stop()
   }
 
   test("disable write ahead log when checkpoint directory is not set") {

http://git-wip-us.apache.org/repos/asf/spark/blob/8f0ea011/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 24cb5af..4bec52b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -139,6 +139,7 @@ abstract class CommonWriteAheadLogTests(
         assert(getLogFilesInDirectory(testDir).size < logFiles.size)
       }
     }
+    writeAheadLog.close()
   }
 
   test(testPrefix + "handling file errors while reading rotating logs") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org