You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/26 10:07:58 UTC

[spark] branch branch-3.1 updated: [SPARK-34224][CORE][SQL][SS][DSTREAM][YARN][TEST][EXAMPLES] Ensure all resource opened by `Source.fromXXX` are closed

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 82da778  [SPARK-34224][CORE][SQL][SS][DSTREAM][YARN][TEST][EXAMPLES] Ensure all resource opened by `Source.fromXXX` are closed
82da778 is described below

commit 82da7788b7c8eac52dd5a9615f12e9145df7b271
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Tue Jan 26 19:06:37 2021 +0900

    [SPARK-34224][CORE][SQL][SS][DSTREAM][YARN][TEST][EXAMPLES] Ensure all resource opened by `Source.fromXXX` are closed
    
    ### What changes were proposed in this pull request?
    Using a function like `.mkString` or `.getLines` directly on a `scala.io.Source` opened by `fromFile`, `fromURL`, `fromURI ` will leak the underlying file handle,  this pr use the `Utils.tryWithResource` method wrap the `BufferedSource` to ensure these `BufferedSource` closed.
    
    ### Why are the changes needed?
    Avoid file handle leak.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass the Jenkins or GitHub Action
    
    Closes #31323 from LuciferYang/source-not-closed.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 8999e8805d7e9786cdb5b96575b264f922c232a2)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../scala/org/apache/spark/SparkContextSuite.scala |  3 +-
 .../spark/deploy/LogUrlsStandaloneSuite.scala      |  4 +--
 .../apache/spark/deploy/master/MasterSuite.scala   | 37 ++++++++++++++--------
 .../org/apache/spark/ui/UISeleniumSuite.scala      | 13 ++++----
 .../test/scala/org/apache/spark/ui/UISuite.scala   |  4 +--
 .../spark/util/MutableURLClassLoaderSuite.scala    |  6 ++--
 .../apache/spark/examples/DFSReadWriteTest.scala   |  5 ++-
 .../sql/kafka010/KafkaSourceOffsetSuite.scala      |  4 +--
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala |  3 +-
 .../spark/deploy/yarn/YarnClusterSuite.scala       |  4 +--
 .../spark/sql/execution/QueryExecutionSuite.scala  | 16 ++++++----
 .../sql/streaming/FileStreamSourceSuite.scala      |  6 ++--
 .../hive/HiveExternalCatalogVersionsSuite.scala    |  5 +--
 .../spark/streaming/util/RawTextSender.scala       |  4 +--
 14 files changed, 66 insertions(+), 48 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 55bfa70..8c9c217 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
 import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit}
 
 import scala.concurrent.duration._
+import scala.io.Source
 
 import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
@@ -376,7 +377,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
       sc.addFile(file1.getAbsolutePath)
       def getAddedFileContents(): String = {
         sc.parallelize(Seq(0)).map { _ =>
-          scala.io.Source.fromFile(SparkFiles.get("file")).mkString
+          Utils.tryWithResource(Source.fromFile(SparkFiles.get("file")))(_.mkString)
         }.first()
       }
       assert(getAddedFileContents() === "old")
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index 84fc169..5d60aad 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -25,7 +25,7 @@ import scala.io.Source
 import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.util.SparkConfWithEnv
+import org.apache.spark.util.{SparkConfWithEnv, Utils}
 
 class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -43,7 +43,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
       assert(info.logUrlMap.nonEmpty)
       // Browse to each URL to check that it's valid
       info.logUrlMap.foreach { case (logType, logUrl) =>
-        val html = Source.fromURL(logUrl).mkString
+        val html = Utils.tryWithResource(Source.fromURL(logUrl))(_.mkString)
         assert(html.contains(s"$logType log page"))
       }
     }
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index b1b97a6..9296274 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -48,6 +48,7 @@ import org.apache.spark.resource.{ResourceInformation, ResourceRequirement}
 import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
 import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
 import org.apache.spark.serializer
+import org.apache.spark.util.Utils
 
 object MockWorker {
   val counter = new AtomicInteger(10000)
@@ -327,22 +328,25 @@ class MasterSuite extends SparkFunSuite
     val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}"
     try {
       eventually(timeout(5.seconds), interval(100.milliseconds)) {
-        val json = Source.fromURL(s"$masterUrl/json").getLines().mkString("\n")
+        val json = Utils
+          .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
         val JArray(workers) = (parse(json) \ "workers")
         workers.size should be (2)
         workers.foreach { workerSummaryJson =>
           val JString(workerWebUi) = workerSummaryJson \ "webuiaddress"
-          val workerResponse = parse(Source.fromURL(s"${workerWebUi}/json")
-            .getLines().mkString("\n"))
+          val workerResponse = parse(Utils
+            .tryWithResource(Source.fromURL(s"$workerWebUi/json"))(_.getLines().mkString("\n")))
           (workerResponse \ "cores").extract[Int] should be (2)
         }
 
-        val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n")
+        val html = Utils
+          .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
         html should include ("Spark Master at spark://")
         val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList
         workerLinks.size should be (2)
         workerLinks foreach { case WORKER_LINK_RE(workerUrl, workerId) =>
-          val workerHtml = Source.fromURL(workerUrl).getLines().mkString("\n")
+          val workerHtml = Utils
+            .tryWithResource(Source.fromURL(workerUrl))(_.getLines().mkString("\n"))
           workerHtml should include ("Spark Worker at")
           workerHtml should include ("Running Executors (0)")
         }
@@ -361,8 +365,8 @@ class MasterSuite extends SparkFunSuite
     val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}"
     try {
       eventually(timeout(5.seconds), interval(100.milliseconds)) {
-        val json = Source.fromURL(s"$masterUrl/json")
-          .getLines().mkString("\n")
+        val json = Utils
+          .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
         val JArray(workers) = (parse(json) \ "workers")
         workers.size should be (2)
         workers.foreach { workerSummaryJson =>
@@ -370,11 +374,13 @@ class MasterSuite extends SparkFunSuite
           // explicitly construct reverse proxy url targeting the master
           val JString(workerId) = workerSummaryJson \ "id"
           val url = s"$masterUrl/proxy/${workerId}/json"
-          val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n"))
+          val workerResponse = parse(
+            Utils.tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
           (workerResponse \ "cores").extract[Int] should be (2)
         }
 
-        val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n")
+        val html = Utils
+          .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
         html should include ("Spark Master at spark://")
         html should include ("""href="/static""")
         html should include ("""src="/static""")
@@ -397,8 +403,8 @@ class MasterSuite extends SparkFunSuite
     val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}"
     try {
       eventually(timeout(5.seconds), interval(100.milliseconds)) {
-        val json = Source.fromURL(s"$masterUrl/json")
-          .getLines().mkString("\n")
+        val json = Utils
+          .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
         val JArray(workers) = (parse(json) \ "workers")
         workers.size should be (2)
         workers.foreach { workerSummaryJson =>
@@ -406,7 +412,8 @@ class MasterSuite extends SparkFunSuite
           // explicitly construct reverse proxy url targeting the master
           val JString(workerId) = workerSummaryJson \ "id"
           val url = s"$masterUrl/proxy/${workerId}/json"
-          val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n"))
+          val workerResponse = parse(Utils
+            .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
           (workerResponse \ "cores").extract[Int] should be (2)
           (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl + "/")
         }
@@ -417,7 +424,8 @@ class MasterSuite extends SparkFunSuite
         System.getProperty("spark.ui.proxyBase") should startWith
           (s"$reverseProxyUrl/proxy/worker-")
         System.setProperty("spark.ui.proxyBase", reverseProxyUrl)
-        val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n")
+        val html = Utils
+          .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
         html should include ("Spark Master at spark://")
         verifyStaticResourcesServedByProxy(html, reverseProxyUrl)
         verifyWorkerUI(html, masterUrl, reverseProxyUrl)
@@ -439,7 +447,8 @@ class MasterSuite extends SparkFunSuite
         // construct url directly targeting the master
         val url = s"$masterUrl/proxy/$workerId/"
         System.setProperty("spark.ui.proxyBase", workerUrl)
-        val workerHtml = Source.fromURL(url).getLines().mkString("\n")
+        val workerHtml = Utils
+          .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))
         workerHtml should include ("Spark Worker at")
         workerHtml should include ("Running Executors (0)")
         verifyStaticResourcesServedByProxy(workerHtml, workerUrl)
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index d7caeaa..044d00a 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -46,6 +46,7 @@ import org.apache.spark.internal.config.Status._
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus}
+import org.apache.spark.util.Utils
 
 private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {
 
@@ -688,8 +689,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
       rdd.count()
 
       eventually(timeout(5.seconds), interval(100.milliseconds)) {
-        val stage0 = Source.fromURL(sc.ui.get.webUrl +
-          "/stages/stage/?id=0&attempt=0&expandDagViz=true").mkString
+        val stage0 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl +
+          "/stages/stage/?id=0&attempt=0&expandDagViz=true"))(_.mkString)
         assert(stage0.contains("digraph G {\n  subgraph clusterstage_0 {\n    " +
           "label=&quot;Stage 0&quot;;\n    subgraph "))
         assert(stage0.contains("{\n      label=&quot;parallelize&quot;;\n      " +
@@ -699,8 +700,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
         assert(stage0.contains("{\n      label=&quot;groupBy&quot;;\n      " +
           "2 [labelType=&quot;html&quot; label=&quot;MapPartitionsRDD [2]"))
 
-        val stage1 = Source.fromURL(sc.ui.get.webUrl +
-          "/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString
+        val stage1 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl +
+          "/stages/stage/?id=1&attempt=0&expandDagViz=true"))(_.mkString)
         assert(stage1.contains("digraph G {\n  subgraph clusterstage_1 {\n    " +
           "label=&quot;Stage 1&quot;;\n    subgraph "))
         assert(stage1.contains("{\n      label=&quot;groupBy&quot;;\n      " +
@@ -710,8 +711,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
         assert(stage1.contains("{\n      label=&quot;groupBy&quot;;\n      " +
           "5 [labelType=&quot;html&quot; label=&quot;MapPartitionsRDD [5]"))
 
-        val stage2 = Source.fromURL(sc.ui.get.webUrl +
-          "/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString
+        val stage2 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl +
+          "/stages/stage/?id=2&attempt=0&expandDagViz=true"))(_.mkString)
         assert(stage2.contains("digraph G {\n  subgraph clusterstage_2 {\n    " +
           "label=&quot;Stage 2&quot;;\n    subgraph "))
         assert(stage2.contains("{\n      label=&quot;groupBy&quot;;\n      " +
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index c7e1dfe..fb3015e 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -76,7 +76,7 @@ class UISuite extends SparkFunSuite {
     withSpark(newSparkContext()) { sc =>
       // test if the ui is visible, and all the expected tabs are visible
       eventually(timeout(10.seconds), interval(50.milliseconds)) {
-        val html = Source.fromURL(sc.ui.get.webUrl).mkString
+        val html = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl))(_.mkString)
         assert(!html.contains("random data that should not be present"))
         assert(html.toLowerCase(Locale.ROOT).contains("stages"))
         assert(html.toLowerCase(Locale.ROOT).contains("storage"))
@@ -90,7 +90,7 @@ class UISuite extends SparkFunSuite {
     withSpark(newSparkContext()) { sc =>
       // test if visible from http://localhost:4040
       eventually(timeout(10.seconds), interval(50.milliseconds)) {
-        val html = Source.fromURL("http://localhost:4040").mkString
+        val html = Utils.tryWithResource(Source.fromURL("http://localhost:4040"))(_.mkString)
         assert(html.toLowerCase(Locale.ROOT).contains("stages"))
       }
     }
diff --git a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
index 3063e79..9435b5a 100644
--- a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
@@ -112,9 +112,9 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
     val res1 = classLoader.getResources("resource1").asScala.toList
     assert(res1.size === 2)
     assert(classLoader.getResources("resource2").asScala.size === 1)
-
-    res1.map(scala.io.Source.fromURL(_).mkString) should contain inOrderOnly
-      ("resource1Contents-child", "resource1Contents-parent")
+    res1.map { res =>
+      Utils.tryWithResource(scala.io.Source.fromURL(res))(_.mkString)
+    } should contain inOrderOnly("resource1Contents-child", "resource1Contents-parent")
     classLoader.close()
     parentLoader.close()
   }
diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
index a738598..323bab4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.Utils
 
 /**
  * Simple test for reading and writing to a distributed
@@ -46,9 +47,7 @@ object DFSReadWriteTest {
   private val NPARAMS = 2
 
   private def readFile(filename: String): List[String] = {
-    val lineIter: Iterator[String] = fromFile(filename).getLines()
-    val lineList: List[String] = lineIter.toList
-    lineList
+    Utils.tryWithResource(fromFile(filename))(_.getLines().toList)
   }
 
   private def printUsage(): Unit = {
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
index ef902fc..553ab42 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.streaming.OffsetSuite
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
 
 class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession {
 
@@ -99,7 +100,6 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession {
   private def readFromResource(file: String): SerializedOffset = {
     import scala.io.Source
     val input = getClass.getResource(s"/$file").toURI
-    val str = Source.fromFile(input).mkString
-    SerializedOffset(str)
+    SerializedOffset(Utils.tryWithResource(Source.fromFile(input))(_.mkString))
   }
 }
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 43ed4a8..fc911ed 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -167,7 +167,8 @@ class KafkaTestUtils(
    * In this method we rewrite krb5.conf to make kdc and client use the same enctypes
    */
   private def rewriteKrb5Conf(): Unit = {
-    val krb5Conf = Source.fromFile(kdc.getKrb5conf, "UTF-8").getLines()
+    val krb5Conf = Utils
+      .tryWithResource(Source.fromFile(kdc.getKrb5conf, "UTF-8"))(_.getLines().toList)
     var rewritten = false
     val addedConfig =
       addedKrb5Config("default_tkt_enctypes", "aes128-cts-hmac-sha1-96") +
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 222b24c..9bc934d 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -480,7 +480,7 @@ private object YarnClusterDriver extends Logging with Matchers {
       executorInfos.foreach { info =>
         assert(info.logUrlMap.nonEmpty)
         info.logUrlMap.values.foreach { url =>
-          val log = Source.fromURL(url).mkString
+          val log = Utils.tryWithResource(Source.fromURL(url))(_.mkString)
           assert(
             !log.contains(SECRET_PASSWORD),
             s"Executor logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
@@ -499,7 +499,7 @@ private object YarnClusterDriver extends Logging with Matchers {
         assert(driverLogs.contains("stdout"))
         val urlStr = driverLogs("stderr")
         driverLogs.foreach { kv =>
-          val log = Source.fromURL(kv._2).mkString
+          val log = Utils.tryWithResource(Source.fromURL(kv._2))(_.mkString)
           assert(
             !log.contains(SECRET_PASSWORD),
             s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
index 585ce4e..cf733b8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
 
 case class QueryExecutionTestRecord(
     c0: Int, c1: Int, c2: Int, c3: Int, c4: Int,
@@ -36,8 +37,9 @@ case class QueryExecutionTestRecord(
 class QueryExecutionSuite extends SharedSparkSession {
   import testImplicits._
 
-  def checkDumpedPlans(path: String, expected: Int): Unit = {
-    assert(Source.fromFile(path).getLines.toList
+  def checkDumpedPlans(path: String, expected: Int): Unit = Utils.tryWithResource(
+    Source.fromFile(path)) { source =>
+    assert(source.getLines.toList
       .takeWhile(_ != "== Whole Stage Codegen ==") == List(
       "== Parsed Logical Plan ==",
       s"Range (0, $expected, step=1, splits=Some(2))",
@@ -99,7 +101,8 @@ class QueryExecutionSuite extends SharedSparkSession {
       val path = dir.getCanonicalPath + "/plans.txt"
       val df = spark.range(0, 10)
       df.queryExecution.debug.toFile(path, explainMode = Option("formatted"))
-      assert(Source.fromFile(path).getLines.toList
+      val lines = Utils.tryWithResource(Source.fromFile(path))(_.getLines().toList)
+      assert(lines
         .takeWhile(_ != "== Whole Stage Codegen ==").map(_.replaceAll("#\\d+", "#x")) == List(
         "== Physical Plan ==",
         s"* Range (1)",
@@ -135,9 +138,10 @@ class QueryExecutionSuite extends SharedSparkSession {
         0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
         16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26)))
       ds.queryExecution.debug.toFile(path)
-      val localRelations = Source.fromFile(path).getLines().filter(_.contains("LocalRelation"))
-
-      assert(!localRelations.exists(_.contains("more fields")))
+      Utils.tryWithResource(Source.fromFile(path)) { source =>
+        val localRelations = source.getLines().filter(_.contains("LocalRelation"))
+        assert(!localRelations.exists(_.contains("more fields")))
+      }
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 6b9fa9c..ff00c47 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1532,8 +1532,10 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
 
   private def readOffsetFromResource(file: String): SerializedOffset = {
     import scala.io.Source
-    val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
-    SerializedOffset(str.trim)
+    Utils.tryWithResource(
+      Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI)) { source =>
+      SerializedOffset(source.mkString.trim)
+    }
   }
 
   private def runTwoBatchesAndVerifyResults(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index 5d276a1..cc278ea 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -239,8 +239,9 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {
   // Tests the latest version of every release line.
   val testingVersions: Seq[String] = {
     import scala.io.Source
-    val versions: Seq[String] = try {
-      Source.fromURL(s"${releaseMirror}/spark").mkString
+    val versions: Seq[String] = try Utils.tryWithResource(
+      Source.fromURL(s"$releaseMirror/spark")) { source =>
+      source.mkString
         .split("\n")
         .filter(_.contains("""<a href="spark-"""))
         .filterNot(_.contains("preview"))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index 5d4fcf8..a63d50f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -26,7 +26,7 @@ import scala.io.Source
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.util.IntParam
+import org.apache.spark.util.{IntParam, Utils}
 
 /**
  * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a
@@ -45,7 +45,7 @@ object RawTextSender extends Logging {
     val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args
 
     // Repeat the input data multiple times to fill in a buffer
-    val lines = Source.fromFile(file).getLines().toArray
+    val lines = Utils.tryWithResource(Source.fromFile(file))(_.getLines().toArray)
     val bufferStream = new ByteArrayOutputStream(blockSize + 1000)
     val ser = new KryoSerializer(new SparkConf()).newInstance()
     val serStream = ser.serializeStream(bufferStream)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org