You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/26 10:07:58 UTC
[spark] branch branch-3.1 updated:
[SPARK-34224][CORE][SQL][SS][DSTREAM][YARN][TEST][EXAMPLES] Ensure all
resource opened by `Source.fromXXX` are closed
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 82da778 [SPARK-34224][CORE][SQL][SS][DSTREAM][YARN][TEST][EXAMPLES] Ensure all resource opened by `Source.fromXXX` are closed
82da778 is described below
commit 82da7788b7c8eac52dd5a9615f12e9145df7b271
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Tue Jan 26 19:06:37 2021 +0900
[SPARK-34224][CORE][SQL][SS][DSTREAM][YARN][TEST][EXAMPLES] Ensure all resource opened by `Source.fromXXX` are closed
### What changes were proposed in this pull request?
Using a function like `.mkString` or `.getLines` directly on a `scala.io.Source` opened by `fromFile`, `fromURL`, `fromURI ` will leak the underlying file handle, this pr use the `Utils.tryWithResource` method wrap the `BufferedSource` to ensure these `BufferedSource` closed.
### Why are the changes needed?
Avoid file handle leak.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes #31323 from LuciferYang/source-not-closed.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit 8999e8805d7e9786cdb5b96575b264f922c232a2)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../scala/org/apache/spark/SparkContextSuite.scala | 3 +-
.../spark/deploy/LogUrlsStandaloneSuite.scala | 4 +--
.../apache/spark/deploy/master/MasterSuite.scala | 37 ++++++++++++++--------
.../org/apache/spark/ui/UISeleniumSuite.scala | 13 ++++----
.../test/scala/org/apache/spark/ui/UISuite.scala | 4 +--
.../spark/util/MutableURLClassLoaderSuite.scala | 6 ++--
.../apache/spark/examples/DFSReadWriteTest.scala | 5 ++-
.../sql/kafka010/KafkaSourceOffsetSuite.scala | 4 +--
.../apache/spark/sql/kafka010/KafkaTestUtils.scala | 3 +-
.../spark/deploy/yarn/YarnClusterSuite.scala | 4 +--
.../spark/sql/execution/QueryExecutionSuite.scala | 16 ++++++----
.../sql/streaming/FileStreamSourceSuite.scala | 6 ++--
.../hive/HiveExternalCatalogVersionsSuite.scala | 5 +--
.../spark/streaming/util/RawTextSender.scala | 4 +--
14 files changed, 66 insertions(+), 48 deletions(-)
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 55bfa70..8c9c217 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit}
import scala.concurrent.duration._
+import scala.io.Source
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
@@ -376,7 +377,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
sc.addFile(file1.getAbsolutePath)
def getAddedFileContents(): String = {
sc.parallelize(Seq(0)).map { _ =>
- scala.io.Source.fromFile(SparkFiles.get("file")).mkString
+ Utils.tryWithResource(Source.fromFile(SparkFiles.get("file")))(_.mkString)
}.first()
}
assert(getAddedFileContents() === "old")
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index 84fc169..5d60aad 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -25,7 +25,7 @@ import scala.io.Source
import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.util.SparkConfWithEnv
+import org.apache.spark.util.{SparkConfWithEnv, Utils}
class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
@@ -43,7 +43,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
assert(info.logUrlMap.nonEmpty)
// Browse to each URL to check that it's valid
info.logUrlMap.foreach { case (logType, logUrl) =>
- val html = Source.fromURL(logUrl).mkString
+ val html = Utils.tryWithResource(Source.fromURL(logUrl))(_.mkString)
assert(html.contains(s"$logType log page"))
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index b1b97a6..9296274 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -48,6 +48,7 @@ import org.apache.spark.resource.{ResourceInformation, ResourceRequirement}
import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.serializer
+import org.apache.spark.util.Utils
object MockWorker {
val counter = new AtomicInteger(10000)
@@ -327,22 +328,25 @@ class MasterSuite extends SparkFunSuite
val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}"
try {
eventually(timeout(5.seconds), interval(100.milliseconds)) {
- val json = Source.fromURL(s"$masterUrl/json").getLines().mkString("\n")
+ val json = Utils
+ .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
val JArray(workers) = (parse(json) \ "workers")
workers.size should be (2)
workers.foreach { workerSummaryJson =>
val JString(workerWebUi) = workerSummaryJson \ "webuiaddress"
- val workerResponse = parse(Source.fromURL(s"${workerWebUi}/json")
- .getLines().mkString("\n"))
+ val workerResponse = parse(Utils
+ .tryWithResource(Source.fromURL(s"$workerWebUi/json"))(_.getLines().mkString("\n")))
(workerResponse \ "cores").extract[Int] should be (2)
}
- val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n")
+ val html = Utils
+ .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
html should include ("Spark Master at spark://")
val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList
workerLinks.size should be (2)
workerLinks foreach { case WORKER_LINK_RE(workerUrl, workerId) =>
- val workerHtml = Source.fromURL(workerUrl).getLines().mkString("\n")
+ val workerHtml = Utils
+ .tryWithResource(Source.fromURL(workerUrl))(_.getLines().mkString("\n"))
workerHtml should include ("Spark Worker at")
workerHtml should include ("Running Executors (0)")
}
@@ -361,8 +365,8 @@ class MasterSuite extends SparkFunSuite
val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}"
try {
eventually(timeout(5.seconds), interval(100.milliseconds)) {
- val json = Source.fromURL(s"$masterUrl/json")
- .getLines().mkString("\n")
+ val json = Utils
+ .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
val JArray(workers) = (parse(json) \ "workers")
workers.size should be (2)
workers.foreach { workerSummaryJson =>
@@ -370,11 +374,13 @@ class MasterSuite extends SparkFunSuite
// explicitly construct reverse proxy url targeting the master
val JString(workerId) = workerSummaryJson \ "id"
val url = s"$masterUrl/proxy/${workerId}/json"
- val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n"))
+ val workerResponse = parse(
+ Utils.tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
(workerResponse \ "cores").extract[Int] should be (2)
}
- val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n")
+ val html = Utils
+ .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
html should include ("Spark Master at spark://")
html should include ("""href="/static""")
html should include ("""src="/static""")
@@ -397,8 +403,8 @@ class MasterSuite extends SparkFunSuite
val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}"
try {
eventually(timeout(5.seconds), interval(100.milliseconds)) {
- val json = Source.fromURL(s"$masterUrl/json")
- .getLines().mkString("\n")
+ val json = Utils
+ .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
val JArray(workers) = (parse(json) \ "workers")
workers.size should be (2)
workers.foreach { workerSummaryJson =>
@@ -406,7 +412,8 @@ class MasterSuite extends SparkFunSuite
// explicitly construct reverse proxy url targeting the master
val JString(workerId) = workerSummaryJson \ "id"
val url = s"$masterUrl/proxy/${workerId}/json"
- val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n"))
+ val workerResponse = parse(Utils
+ .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
(workerResponse \ "cores").extract[Int] should be (2)
(workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl + "/")
}
@@ -417,7 +424,8 @@ class MasterSuite extends SparkFunSuite
System.getProperty("spark.ui.proxyBase") should startWith
(s"$reverseProxyUrl/proxy/worker-")
System.setProperty("spark.ui.proxyBase", reverseProxyUrl)
- val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n")
+ val html = Utils
+ .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
html should include ("Spark Master at spark://")
verifyStaticResourcesServedByProxy(html, reverseProxyUrl)
verifyWorkerUI(html, masterUrl, reverseProxyUrl)
@@ -439,7 +447,8 @@ class MasterSuite extends SparkFunSuite
// construct url directly targeting the master
val url = s"$masterUrl/proxy/$workerId/"
System.setProperty("spark.ui.proxyBase", workerUrl)
- val workerHtml = Source.fromURL(url).getLines().mkString("\n")
+ val workerHtml = Utils
+ .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))
workerHtml should include ("Spark Worker at")
workerHtml should include ("Running Executors (0)")
verifyStaticResourcesServedByProxy(workerHtml, workerUrl)
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index d7caeaa..044d00a 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -46,6 +46,7 @@ import org.apache.spark.internal.config.Status._
import org.apache.spark.internal.config.UI._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus}
+import org.apache.spark.util.Utils
private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {
@@ -688,8 +689,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
rdd.count()
eventually(timeout(5.seconds), interval(100.milliseconds)) {
- val stage0 = Source.fromURL(sc.ui.get.webUrl +
- "/stages/stage/?id=0&attempt=0&expandDagViz=true").mkString
+ val stage0 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl +
+ "/stages/stage/?id=0&attempt=0&expandDagViz=true"))(_.mkString)
assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " +
"label="Stage 0";\n subgraph "))
assert(stage0.contains("{\n label="parallelize";\n " +
@@ -699,8 +700,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
assert(stage0.contains("{\n label="groupBy";\n " +
"2 [labelType="html" label="MapPartitionsRDD [2]"))
- val stage1 = Source.fromURL(sc.ui.get.webUrl +
- "/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString
+ val stage1 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl +
+ "/stages/stage/?id=1&attempt=0&expandDagViz=true"))(_.mkString)
assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " +
"label="Stage 1";\n subgraph "))
assert(stage1.contains("{\n label="groupBy";\n " +
@@ -710,8 +711,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
assert(stage1.contains("{\n label="groupBy";\n " +
"5 [labelType="html" label="MapPartitionsRDD [5]"))
- val stage2 = Source.fromURL(sc.ui.get.webUrl +
- "/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString
+ val stage2 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl +
+ "/stages/stage/?id=2&attempt=0&expandDagViz=true"))(_.mkString)
assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " +
"label="Stage 2";\n subgraph "))
assert(stage2.contains("{\n label="groupBy";\n " +
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index c7e1dfe..fb3015e 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -76,7 +76,7 @@ class UISuite extends SparkFunSuite {
withSpark(newSparkContext()) { sc =>
// test if the ui is visible, and all the expected tabs are visible
eventually(timeout(10.seconds), interval(50.milliseconds)) {
- val html = Source.fromURL(sc.ui.get.webUrl).mkString
+ val html = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl))(_.mkString)
assert(!html.contains("random data that should not be present"))
assert(html.toLowerCase(Locale.ROOT).contains("stages"))
assert(html.toLowerCase(Locale.ROOT).contains("storage"))
@@ -90,7 +90,7 @@ class UISuite extends SparkFunSuite {
withSpark(newSparkContext()) { sc =>
// test if visible from http://localhost:4040
eventually(timeout(10.seconds), interval(50.milliseconds)) {
- val html = Source.fromURL("http://localhost:4040").mkString
+ val html = Utils.tryWithResource(Source.fromURL("http://localhost:4040"))(_.mkString)
assert(html.toLowerCase(Locale.ROOT).contains("stages"))
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
index 3063e79..9435b5a 100644
--- a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
@@ -112,9 +112,9 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
val res1 = classLoader.getResources("resource1").asScala.toList
assert(res1.size === 2)
assert(classLoader.getResources("resource2").asScala.size === 1)
-
- res1.map(scala.io.Source.fromURL(_).mkString) should contain inOrderOnly
- ("resource1Contents-child", "resource1Contents-parent")
+ res1.map { res =>
+ Utils.tryWithResource(scala.io.Source.fromURL(res))(_.mkString)
+ } should contain inOrderOnly("resource1Contents-child", "resource1Contents-parent")
classLoader.close()
parentLoader.close()
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
index a738598..323bab4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.Utils
/**
* Simple test for reading and writing to a distributed
@@ -46,9 +47,7 @@ object DFSReadWriteTest {
private val NPARAMS = 2
private def readFile(filename: String): List[String] = {
- val lineIter: Iterator[String] = fromFile(filename).getLines()
- val lineList: List[String] = lineIter.toList
- lineList
+ Utils.tryWithResource(fromFile(filename))(_.getLines().toList)
}
private def printUsage(): Unit = {
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
index ef902fc..553ab42 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.streaming.OffsetSuite
import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession {
@@ -99,7 +100,6 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession {
private def readFromResource(file: String): SerializedOffset = {
import scala.io.Source
val input = getClass.getResource(s"/$file").toURI
- val str = Source.fromFile(input).mkString
- SerializedOffset(str)
+ SerializedOffset(Utils.tryWithResource(Source.fromFile(input))(_.mkString))
}
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 43ed4a8..fc911ed 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -167,7 +167,8 @@ class KafkaTestUtils(
* In this method we rewrite krb5.conf to make kdc and client use the same enctypes
*/
private def rewriteKrb5Conf(): Unit = {
- val krb5Conf = Source.fromFile(kdc.getKrb5conf, "UTF-8").getLines()
+ val krb5Conf = Utils
+ .tryWithResource(Source.fromFile(kdc.getKrb5conf, "UTF-8"))(_.getLines().toList)
var rewritten = false
val addedConfig =
addedKrb5Config("default_tkt_enctypes", "aes128-cts-hmac-sha1-96") +
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 222b24c..9bc934d 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -480,7 +480,7 @@ private object YarnClusterDriver extends Logging with Matchers {
executorInfos.foreach { info =>
assert(info.logUrlMap.nonEmpty)
info.logUrlMap.values.foreach { url =>
- val log = Source.fromURL(url).mkString
+ val log = Utils.tryWithResource(Source.fromURL(url))(_.mkString)
assert(
!log.contains(SECRET_PASSWORD),
s"Executor logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
@@ -499,7 +499,7 @@ private object YarnClusterDriver extends Logging with Matchers {
assert(driverLogs.contains("stdout"))
val urlStr = driverLogs("stderr")
driverLogs.foreach { kv =>
- val log = Source.fromURL(kv._2).mkString
+ val log = Utils.tryWithResource(Source.fromURL(kv._2))(_.mkString)
assert(
!log.contains(SECRET_PASSWORD),
s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
index 585ce4e..cf733b8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
case class QueryExecutionTestRecord(
c0: Int, c1: Int, c2: Int, c3: Int, c4: Int,
@@ -36,8 +37,9 @@ case class QueryExecutionTestRecord(
class QueryExecutionSuite extends SharedSparkSession {
import testImplicits._
- def checkDumpedPlans(path: String, expected: Int): Unit = {
- assert(Source.fromFile(path).getLines.toList
+ def checkDumpedPlans(path: String, expected: Int): Unit = Utils.tryWithResource(
+ Source.fromFile(path)) { source =>
+ assert(source.getLines.toList
.takeWhile(_ != "== Whole Stage Codegen ==") == List(
"== Parsed Logical Plan ==",
s"Range (0, $expected, step=1, splits=Some(2))",
@@ -99,7 +101,8 @@ class QueryExecutionSuite extends SharedSparkSession {
val path = dir.getCanonicalPath + "/plans.txt"
val df = spark.range(0, 10)
df.queryExecution.debug.toFile(path, explainMode = Option("formatted"))
- assert(Source.fromFile(path).getLines.toList
+ val lines = Utils.tryWithResource(Source.fromFile(path))(_.getLines().toList)
+ assert(lines
.takeWhile(_ != "== Whole Stage Codegen ==").map(_.replaceAll("#\\d+", "#x")) == List(
"== Physical Plan ==",
s"* Range (1)",
@@ -135,9 +138,10 @@ class QueryExecutionSuite extends SharedSparkSession {
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26)))
ds.queryExecution.debug.toFile(path)
- val localRelations = Source.fromFile(path).getLines().filter(_.contains("LocalRelation"))
-
- assert(!localRelations.exists(_.contains("more fields")))
+ Utils.tryWithResource(Source.fromFile(path)) { source =>
+ val localRelations = source.getLines().filter(_.contains("LocalRelation"))
+ assert(!localRelations.exists(_.contains("more fields")))
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 6b9fa9c..ff00c47 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1532,8 +1532,10 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
private def readOffsetFromResource(file: String): SerializedOffset = {
import scala.io.Source
- val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
- SerializedOffset(str.trim)
+ Utils.tryWithResource(
+ Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI)) { source =>
+ SerializedOffset(source.mkString.trim)
+ }
}
private def runTwoBatchesAndVerifyResults(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
index 5d276a1..cc278ea 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -239,8 +239,9 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {
// Tests the latest version of every release line.
val testingVersions: Seq[String] = {
import scala.io.Source
- val versions: Seq[String] = try {
- Source.fromURL(s"${releaseMirror}/spark").mkString
+ val versions: Seq[String] = try Utils.tryWithResource(
+ Source.fromURL(s"$releaseMirror/spark")) { source =>
+ source.mkString
.split("\n")
.filter(_.contains("""<a href="spark-"""))
.filterNot(_.contains("preview"))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index 5d4fcf8..a63d50f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -26,7 +26,7 @@ import scala.io.Source
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.util.IntParam
+import org.apache.spark.util.{IntParam, Utils}
/**
* A helper program that sends blocks of Kryo-serialized text strings out on a socket at a
@@ -45,7 +45,7 @@ object RawTextSender extends Logging {
val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args
// Repeat the input data multiple times to fill in a buffer
- val lines = Source.fromFile(file).getLines().toArray
+ val lines = Utils.tryWithResource(Source.fromFile(file))(_.getLines().toArray)
val bufferStream = new ByteArrayOutputStream(blockSize + 1000)
val ser = new KryoSerializer(new SparkConf()).newInstance()
val serStream = ser.serializeStream(bufferStream)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org