You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/05/30 10:54:59 UTC
[spark] branch master updated: [SPARK-27875][CORE][SQL][ML][K8S]
Wrap all PrintWriter with Utils.tryWithResource
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new db3e746 [SPARK-27875][CORE][SQL][ML][K8S] Wrap all PrintWriter with Utils.tryWithResource
db3e746 is described below
commit db3e746b64a3f78ce60bcfd6f372735f574da95a
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Thu May 30 19:54:32 2019 +0900
[SPARK-27875][CORE][SQL][ML][K8S] Wrap all PrintWriter with Utils.tryWithResource
## What changes were proposed in this pull request?
This pr wrap all `PrintWriter` with `Utils.tryWithResource` to prevent resource leak.
## How was this patch tested?
Existing test
Closes #24739 from wangyum/SPARK-27875.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../spark/api/python/PythonBroadcastSuite.scala | 6 ++---
.../org/apache/spark/deploy/SparkSubmitSuite.scala | 28 +++++++++----------
.../deploy/history/FsHistoryProviderSuite.scala | 10 +++----
.../spark/metrics/InputOutputMetricsSuite.scala | 12 ++++-----
.../spark/scheduler/ReplayListenerSuite.scala | 26 +++++++++---------
.../ml/param/shared/SharedParamsCodeGen.scala | 8 +++---
.../features/PodTemplateConfigMapStepSuite.scala | 7 ++---
.../apache/spark/sql/catalyst/util/package.scala | 13 ++++-----
.../spark/sql/execution/command/DDLSuite.scala | 12 ++++-----
.../apache/spark/sql/hive/StatisticsSuite.scala | 31 +++++++++++-----------
.../spark/sql/hive/client/VersionsSuite.scala | 6 ++---
.../spark/sql/hive/execution/HiveUDFSuite.scala | 12 ++++-----
12 files changed, 88 insertions(+), 83 deletions(-)
diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
index 24004de..dffdd96 100644
--- a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
@@ -42,9 +42,9 @@ class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkC
withTempDir { tempDir =>
val broadcastDataFile: File = {
val file = new File(tempDir, "broadcastData")
- val printWriter = new PrintWriter(file)
- printWriter.write(broadcastedString)
- printWriter.close()
+ Utils.tryWithResource(new PrintWriter(file)) { printWriter =>
+ printWriter.write(broadcastedString)
+ }
file
}
val broadcast = new PythonBroadcast(broadcastDataFile.getAbsolutePath)
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 65c9cb9..385f549 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -749,10 +749,10 @@ class SparkSubmitSuite
withTempDir { tmpDir =>
// Test jars and files
val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir)
- val writer1 = new PrintWriter(f1)
- writer1.println("spark.jars " + jars)
- writer1.println("spark.files " + files)
- writer1.close()
+ Utils.tryWithResource(new PrintWriter(f1)) { writer =>
+ writer.println("spark.jars " + jars)
+ writer.println("spark.files " + files)
+ }
val clArgs = Seq(
"--master", "local",
"--class", "org.SomeClass",
@@ -766,10 +766,10 @@ class SparkSubmitSuite
// Test files and archives (Yarn)
val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
- val writer2 = new PrintWriter(f2)
- writer2.println("spark.yarn.dist.files " + files)
- writer2.println("spark.yarn.dist.archives " + archives)
- writer2.close()
+ Utils.tryWithResource(new PrintWriter(f2)) { writer =>
+ writer.println("spark.yarn.dist.files " + files)
+ writer.println("spark.yarn.dist.archives " + archives)
+ }
val clArgs2 = Seq(
"--master", "yarn",
"--class", "org.SomeClass",
@@ -783,9 +783,9 @@ class SparkSubmitSuite
// Test python files
val f3 = File.createTempFile("test-submit-python-files", "", tmpDir)
- val writer3 = new PrintWriter(f3)
- writer3.println("spark.submit.pyFiles " + pyFiles)
- writer3.close()
+ Utils.tryWithResource(new PrintWriter(f3)) { writer =>
+ writer.println("spark.submit.pyFiles " + pyFiles)
+ }
val clArgs3 = Seq(
"--master", "local",
"--properties-file", f3.getPath,
@@ -802,10 +802,10 @@ class SparkSubmitSuite
val f4 = File.createTempFile("test-submit-remote-python-files", "", tmpDir)
val pyFile1 = File.createTempFile("file1", ".py", tmpDir)
val pyFile2 = File.createTempFile("file2", ".py", tmpDir)
- val writer4 = new PrintWriter(f4)
val remotePyFiles = s"s3a://${pyFile1.getAbsolutePath},s3a://${pyFile2.getAbsolutePath}"
- writer4.println("spark.submit.pyFiles " + remotePyFiles)
- writer4.close()
+ Utils.tryWithResource(new PrintWriter(f4)) { writer =>
+ writer.println("spark.submit.pyFiles " + remotePyFiles)
+ }
val clArgs4 = Seq(
"--master", "yarn",
"--deploy-mode", "cluster",
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 86575b1..791814b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -770,11 +770,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
// write out one totally bogus hidden file
val hiddenGarbageFile = new File(testDir, ".garbage")
- val out = new PrintWriter(hiddenGarbageFile)
- // scalastyle:off println
- out.println("GARBAGE")
- // scalastyle:on println
- out.close()
+ Utils.tryWithResource(new PrintWriter(hiddenGarbageFile)) { out =>
+ // scalastyle:off println
+ out.println("GARBAGE")
+ // scalastyle:on println
+ }
// also write out one real event log file, but since its a hidden file, we shouldn't read it
val tmpNewAppFile = newLogFile("hidden", None, inProgress = false)
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index 6f4203d..c7bd0c9 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -51,13 +51,13 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
testTempDir.mkdir()
tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt")
- val pw = new PrintWriter(new FileWriter(tmpFile))
- for (x <- 1 to numRecords) {
- // scalastyle:off println
- pw.println(RandomUtils.nextInt(0, numBuckets))
- // scalastyle:on println
+ Utils.tryWithResource(new PrintWriter(tmpFile)) { pw =>
+ for (x <- 1 to numRecords) {
+ // scalastyle:off println
+ pw.println(RandomUtils.nextInt(0, numBuckets))
+ // scalastyle:on println
+ }
}
- pw.close()
// Path to tmpFile
tmpFilePath = tmpFile.toURI.toString
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index d1113c7..7d0712b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -50,15 +50,15 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
test("Simple replay") {
val logFilePath = getFilePath(testDir, "events.txt")
val fstream = fileSystem.create(logFilePath)
- val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
- // scalastyle:off println
- writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
- writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
- // scalastyle:on println
- writer.close()
+ Utils.tryWithResource(new PrintWriter(fstream)) { writer =>
+ // scalastyle:off println
+ writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
+ writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
+ // scalastyle:on println
+ }
val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val logData = fileSystem.open(logFilePath)
@@ -132,16 +132,16 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
test("Replay incompatible event log") {
val logFilePath = getFilePath(testDir, "incompatible.txt")
val fstream = fileSystem.create(logFilePath)
- val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Incompatible App", None,
125L, "UserUsingIncompatibleVersion", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
- // scalastyle:off println
- writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
- writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""")
- writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
- // scalastyle:on println
- writer.close()
+ Utils.tryWithResource(new PrintWriter(fstream)) { writer =>
+ // scalastyle:off println
+ writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
+ writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""")
+ writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
+ // scalastyle:on println
+ }
val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val logData = fileSystem.open(logFilePath)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
index 7e08675..1afcf1b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
@@ -22,6 +22,8 @@ import java.io.PrintWriter
import scala.reflect.ClassTag
import scala.xml.Utility
+import org.apache.spark.util.Utils
+
/**
* Code generator for shared params (sharedParams.scala). Run under the Spark folder with
* {{{
@@ -103,9 +105,9 @@ private[shared] object SharedParamsCodeGen {
val code = genSharedParams(params)
val file = "src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala"
- val writer = new PrintWriter(file)
- writer.write(code)
- writer.close()
+ Utils.tryWithResource(new PrintWriter(file)) { writer =>
+ writer.write(code)
+ }
}
/** Description of a param. */
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
index 5e7388d..051320f 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
@@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model.ConfigMap
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
+import org.apache.spark.util.Utils
class PodTemplateConfigMapStepSuite extends SparkFunSuite {
@@ -46,9 +47,9 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite {
.set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, templateFile.getAbsolutePath)
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
- val writer = new PrintWriter(templateFile)
- writer.write("pod-template-contents")
- writer.close()
+ Utils.tryWithResource(new PrintWriter(templateFile)) { writer =>
+ writer.write("pod-template-contents")
+ }
val step = new PodTemplateConfigMapStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index 12e8d02..eefabbf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -93,9 +93,9 @@ package object util extends Logging {
}
def stringToFile(file: File, str: String): File = {
- val out = new PrintWriter(file)
- out.write(str)
- out.close()
+ Utils.tryWithResource(new PrintWriter(file)) { out =>
+ out.write(str)
+ }
file
}
@@ -115,9 +115,10 @@ package object util extends Logging {
def stackTraceToString(t: Throwable): String = {
val out = new java.io.ByteArrayOutputStream
- val writer = new PrintWriter(out)
- t.printStackTrace(writer)
- writer.flush()
+ Utils.tryWithResource(new PrintWriter(out)) { writer =>
+ t.printStackTrace(writer)
+ writer.flush()
+ }
new String(out.toByteArray, StandardCharsets.UTF_8)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 4859bde..0124f28 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2720,14 +2720,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
test("Refresh table before drop database cascade") {
withTempDir { tempDir =>
val file1 = new File(tempDir + "/first.csv")
- val writer1 = new PrintWriter(file1)
- writer1.write("first")
- writer1.close()
+ Utils.tryWithResource(new PrintWriter(file1)) { writer =>
+ writer.write("first")
+ }
val file2 = new File(tempDir + "/second.csv")
- val writer2 = new PrintWriter(file2)
- writer2.write("second")
- writer2.close()
+ Utils.tryWithResource(new PrintWriter(file2)) { writer =>
+ writer.write("second")
+ }
withDatabase("foo") {
withTable("foo.first") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 44b1362..483bd37 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
@@ -77,14 +78,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
withTempDir { tempDir =>
// EXTERNAL OpenCSVSerde table pointing to LOCATION
val file1 = new File(tempDir + "/data1")
- val writer1 = new PrintWriter(file1)
- writer1.write("1,2")
- writer1.close()
+ Utils.tryWithResource(new PrintWriter(file1)) { writer =>
+ writer.write("1,2")
+ }
val file2 = new File(tempDir + "/data2")
- val writer2 = new PrintWriter(file2)
- writer2.write("1,2")
- writer2.close()
+ Utils.tryWithResource(new PrintWriter(file2)) { writer =>
+ writer.write("1,2")
+ }
sql(
s"""
@@ -957,9 +958,9 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
withTempDir { loadPath =>
// load data command
val file = new File(loadPath + "/data")
- val writer = new PrintWriter(file)
- writer.write("2,xyz")
- writer.close()
+ Utils.tryWithResource(new PrintWriter(file)) { writer =>
+ writer.write("2,xyz")
+ }
sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE $table")
if (autoUpdate) {
val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
@@ -994,14 +995,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
val file1 = new File(dir1 + "/data")
- val writer1 = new PrintWriter(file1)
- writer1.write("1,a")
- writer1.close()
+ Utils.tryWithResource(new PrintWriter(file1)) { writer =>
+ writer.write("1,a")
+ }
val file2 = new File(dir2 + "/data")
- val writer2 = new PrintWriter(file2)
- writer2.write("1,a")
- writer2.close()
+ Utils.tryWithResource(new PrintWriter(file2)) { writer =>
+ writer.write("1,a")
+ }
// add partition command
sql(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 3284579..9861a0a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -949,9 +949,9 @@ class VersionsSuite extends SparkFunSuite with Logging {
|}
""".stripMargin
val schemaFile = new File(dir, "avroDecimal.avsc")
- val writer = new PrintWriter(schemaFile)
- writer.write(avroSchema)
- writer.close()
+ Utils.tryWithResource(new PrintWriter(schemaFile)) { writer =>
+ writer.write(avroSchema)
+ }
val schemaPath = schemaFile.toURI.toString
val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index a6fc744..446267d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -454,14 +454,14 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
// EXTERNAL OpenCSVSerde table pointing to LOCATION
val file1 = new File(tempDir + "/data1")
- val writer1 = new PrintWriter(file1)
- writer1.write("1,2")
- writer1.close()
+ Utils.tryWithResource(new PrintWriter(file1)) { writer =>
+ writer.write("1,2")
+ }
val file2 = new File(tempDir + "/data2")
- val writer2 = new PrintWriter(file2)
- writer2.write("1,2")
- writer2.close()
+ Utils.tryWithResource(new PrintWriter(file2)) { writer =>
+ writer.write("1,2")
+ }
sql(
s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org