You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/05/30 10:54:59 UTC

[spark] branch master updated: [SPARK-27875][CORE][SQL][ML][K8S] Wrap all PrintWriter with Utils.tryWithResource

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new db3e746  [SPARK-27875][CORE][SQL][ML][K8S] Wrap all PrintWriter with Utils.tryWithResource
db3e746 is described below

commit db3e746b64a3f78ce60bcfd6f372735f574da95a
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Thu May 30 19:54:32 2019 +0900

    [SPARK-27875][CORE][SQL][ML][K8S] Wrap all PrintWriter with Utils.tryWithResource
    
    ## What changes were proposed in this pull request?
    
    This pr wrap all `PrintWriter` with `Utils.tryWithResource` to prevent resource leak.
    
    ## How was this patch tested?
    
    Existing test
    
    Closes #24739 from wangyum/SPARK-27875.
    
    Authored-by: Yuming Wang <yu...@ebay.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../spark/api/python/PythonBroadcastSuite.scala    |  6 ++---
 .../org/apache/spark/deploy/SparkSubmitSuite.scala | 28 +++++++++----------
 .../deploy/history/FsHistoryProviderSuite.scala    | 10 +++----
 .../spark/metrics/InputOutputMetricsSuite.scala    | 12 ++++-----
 .../spark/scheduler/ReplayListenerSuite.scala      | 26 +++++++++---------
 .../ml/param/shared/SharedParamsCodeGen.scala      |  8 +++---
 .../features/PodTemplateConfigMapStepSuite.scala   |  7 ++---
 .../apache/spark/sql/catalyst/util/package.scala   | 13 ++++-----
 .../spark/sql/execution/command/DDLSuite.scala     | 12 ++++-----
 .../apache/spark/sql/hive/StatisticsSuite.scala    | 31 +++++++++++-----------
 .../spark/sql/hive/client/VersionsSuite.scala      |  6 ++---
 .../spark/sql/hive/execution/HiveUDFSuite.scala    | 12 ++++-----
 12 files changed, 88 insertions(+), 83 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
index 24004de..dffdd96 100644
--- a/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/api/python/PythonBroadcastSuite.scala
@@ -42,9 +42,9 @@ class PythonBroadcastSuite extends SparkFunSuite with Matchers with SharedSparkC
     withTempDir { tempDir =>
       val broadcastDataFile: File = {
         val file = new File(tempDir, "broadcastData")
-        val printWriter = new PrintWriter(file)
-        printWriter.write(broadcastedString)
-        printWriter.close()
+        Utils.tryWithResource(new PrintWriter(file)) { printWriter =>
+          printWriter.write(broadcastedString)
+        }
         file
       }
       val broadcast = new PythonBroadcast(broadcastDataFile.getAbsolutePath)
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 65c9cb9..385f549 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -749,10 +749,10 @@ class SparkSubmitSuite
     withTempDir { tmpDir =>
       // Test jars and files
       val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir)
-      val writer1 = new PrintWriter(f1)
-      writer1.println("spark.jars " + jars)
-      writer1.println("spark.files " + files)
-      writer1.close()
+      Utils.tryWithResource(new PrintWriter(f1)) { writer =>
+        writer.println("spark.jars " + jars)
+        writer.println("spark.files " + files)
+      }
       val clArgs = Seq(
         "--master", "local",
         "--class", "org.SomeClass",
@@ -766,10 +766,10 @@ class SparkSubmitSuite
 
       // Test files and archives (Yarn)
       val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
-      val writer2 = new PrintWriter(f2)
-      writer2.println("spark.yarn.dist.files " + files)
-      writer2.println("spark.yarn.dist.archives " + archives)
-      writer2.close()
+      Utils.tryWithResource(new PrintWriter(f2)) { writer =>
+        writer.println("spark.yarn.dist.files " + files)
+        writer.println("spark.yarn.dist.archives " + archives)
+      }
       val clArgs2 = Seq(
         "--master", "yarn",
         "--class", "org.SomeClass",
@@ -783,9 +783,9 @@ class SparkSubmitSuite
 
       // Test python files
       val f3 = File.createTempFile("test-submit-python-files", "", tmpDir)
-      val writer3 = new PrintWriter(f3)
-      writer3.println("spark.submit.pyFiles " + pyFiles)
-      writer3.close()
+      Utils.tryWithResource(new PrintWriter(f3)) { writer =>
+        writer.println("spark.submit.pyFiles " + pyFiles)
+      }
       val clArgs3 = Seq(
         "--master", "local",
         "--properties-file", f3.getPath,
@@ -802,10 +802,10 @@ class SparkSubmitSuite
       val f4 = File.createTempFile("test-submit-remote-python-files", "", tmpDir)
       val pyFile1 = File.createTempFile("file1", ".py", tmpDir)
       val pyFile2 = File.createTempFile("file2", ".py", tmpDir)
-      val writer4 = new PrintWriter(f4)
       val remotePyFiles = s"s3a://${pyFile1.getAbsolutePath},s3a://${pyFile2.getAbsolutePath}"
-      writer4.println("spark.submit.pyFiles " + remotePyFiles)
-      writer4.close()
+      Utils.tryWithResource(new PrintWriter(f4)) { writer =>
+        writer.println("spark.submit.pyFiles " + remotePyFiles)
+      }
       val clArgs4 = Seq(
         "--master", "yarn",
         "--deploy-mode", "cluster",
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 86575b1..791814b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -770,11 +770,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
 
     // write out one totally bogus hidden file
     val hiddenGarbageFile = new File(testDir, ".garbage")
-    val out = new PrintWriter(hiddenGarbageFile)
-    // scalastyle:off println
-    out.println("GARBAGE")
-    // scalastyle:on println
-    out.close()
+    Utils.tryWithResource(new PrintWriter(hiddenGarbageFile)) { out =>
+      // scalastyle:off println
+      out.println("GARBAGE")
+      // scalastyle:on println
+    }
 
     // also write out one real event log file, but since its a hidden file, we shouldn't read it
     val tmpNewAppFile = newLogFile("hidden", None, inProgress = false)
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index 6f4203d..c7bd0c9 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -51,13 +51,13 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
     testTempDir.mkdir()
 
     tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt")
-    val pw = new PrintWriter(new FileWriter(tmpFile))
-    for (x <- 1 to numRecords) {
-      // scalastyle:off println
-      pw.println(RandomUtils.nextInt(0, numBuckets))
-      // scalastyle:on println
+    Utils.tryWithResource(new PrintWriter(tmpFile)) { pw =>
+      for (x <- 1 to numRecords) {
+        // scalastyle:off println
+        pw.println(RandomUtils.nextInt(0, numBuckets))
+        // scalastyle:on println
+      }
     }
-    pw.close()
 
     // Path to tmpFile
     tmpFilePath = tmpFile.toURI.toString
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index d1113c7..7d0712b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -50,15 +50,15 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
   test("Simple replay") {
     val logFilePath = getFilePath(testDir, "events.txt")
     val fstream = fileSystem.create(logFilePath)
-    val writer = new PrintWriter(fstream)
     val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
       125L, "Mickey", None)
     val applicationEnd = SparkListenerApplicationEnd(1000L)
-    // scalastyle:off println
-    writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
-    writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
-    // scalastyle:on println
-    writer.close()
+    Utils.tryWithResource(new PrintWriter(fstream)) { writer =>
+      // scalastyle:off println
+      writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
+      writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
+      // scalastyle:on println
+    }
 
     val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
     val logData = fileSystem.open(logFilePath)
@@ -132,16 +132,16 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
   test("Replay incompatible event log") {
     val logFilePath = getFilePath(testDir, "incompatible.txt")
     val fstream = fileSystem.create(logFilePath)
-    val writer = new PrintWriter(fstream)
     val applicationStart = SparkListenerApplicationStart("Incompatible App", None,
       125L, "UserUsingIncompatibleVersion", None)
     val applicationEnd = SparkListenerApplicationEnd(1000L)
-    // scalastyle:off println
-    writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
-    writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""")
-    writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
-    // scalastyle:on println
-    writer.close()
+    Utils.tryWithResource(new PrintWriter(fstream)) { writer =>
+      // scalastyle:off println
+      writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
+      writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""")
+      writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
+      // scalastyle:on println
+    }
 
     val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
     val logData = fileSystem.open(logFilePath)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
index 7e08675..1afcf1b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
@@ -22,6 +22,8 @@ import java.io.PrintWriter
 import scala.reflect.ClassTag
 import scala.xml.Utility
 
+import org.apache.spark.util.Utils
+
 /**
  * Code generator for shared params (sharedParams.scala). Run under the Spark folder with
  * {{{
@@ -103,9 +105,9 @@ private[shared] object SharedParamsCodeGen {
 
     val code = genSharedParams(params)
     val file = "src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala"
-    val writer = new PrintWriter(file)
-    writer.write(code)
-    writer.close()
+    Utils.tryWithResource(new PrintWriter(file)) { writer =>
+      writer.write(code)
+    }
   }
 
   /** Description of a param. */
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
index 5e7388d..051320f 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
@@ -23,6 +23,7 @@ import io.fabric8.kubernetes.api.model.ConfigMap
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s._
+import org.apache.spark.util.Utils
 
 class PodTemplateConfigMapStepSuite extends SparkFunSuite {
 
@@ -46,9 +47,9 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite {
       .set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, templateFile.getAbsolutePath)
     val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
 
-    val writer = new PrintWriter(templateFile)
-    writer.write("pod-template-contents")
-    writer.close()
+    Utils.tryWithResource(new PrintWriter(templateFile)) { writer =>
+      writer.write("pod-template-contents")
+    }
 
     val step = new PodTemplateConfigMapStep(kubernetesConf)
     val configuredPod = step.configurePod(SparkPod.initialPod())
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index 12e8d02..eefabbf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -93,9 +93,9 @@ package object util extends Logging {
   }
 
   def stringToFile(file: File, str: String): File = {
-    val out = new PrintWriter(file)
-    out.write(str)
-    out.close()
+    Utils.tryWithResource(new PrintWriter(file)) { out =>
+      out.write(str)
+    }
     file
   }
 
@@ -115,9 +115,10 @@ package object util extends Logging {
 
   def stackTraceToString(t: Throwable): String = {
     val out = new java.io.ByteArrayOutputStream
-    val writer = new PrintWriter(out)
-    t.printStackTrace(writer)
-    writer.flush()
+    Utils.tryWithResource(new PrintWriter(out)) { writer =>
+      t.printStackTrace(writer)
+      writer.flush()
+    }
     new String(out.toByteArray, StandardCharsets.UTF_8)
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 4859bde..0124f28 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2720,14 +2720,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   test("Refresh table before drop database cascade") {
     withTempDir { tempDir =>
       val file1 = new File(tempDir + "/first.csv")
-      val writer1 = new PrintWriter(file1)
-      writer1.write("first")
-      writer1.close()
+      Utils.tryWithResource(new PrintWriter(file1)) { writer =>
+        writer.write("first")
+      }
 
       val file2 = new File(tempDir + "/second.csv")
-      val writer2 = new PrintWriter(file2)
-      writer2.write("second")
-      writer2.close()
+      Utils.tryWithResource(new PrintWriter(file2)) { writer =>
+        writer.write("second")
+      }
 
       withDatabase("foo") {
         withTable("foo.first") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 44b1362..483bd37 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 
 class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
@@ -77,14 +78,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
         withTempDir { tempDir =>
           // EXTERNAL OpenCSVSerde table pointing to LOCATION
           val file1 = new File(tempDir + "/data1")
-          val writer1 = new PrintWriter(file1)
-          writer1.write("1,2")
-          writer1.close()
+          Utils.tryWithResource(new PrintWriter(file1)) { writer =>
+            writer.write("1,2")
+          }
 
           val file2 = new File(tempDir + "/data2")
-          val writer2 = new PrintWriter(file2)
-          writer2.write("1,2")
-          writer2.close()
+          Utils.tryWithResource(new PrintWriter(file2)) { writer =>
+            writer.write("1,2")
+          }
 
           sql(
             s"""
@@ -957,9 +958,9 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
           withTempDir { loadPath =>
             // load data command
             val file = new File(loadPath + "/data")
-            val writer = new PrintWriter(file)
-            writer.write("2,xyz")
-            writer.close()
+            Utils.tryWithResource(new PrintWriter(file)) { writer =>
+              writer.write("2,xyz")
+            }
             sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE $table")
             if (autoUpdate) {
               val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
@@ -994,14 +995,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
 
           withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
             val file1 = new File(dir1 + "/data")
-            val writer1 = new PrintWriter(file1)
-            writer1.write("1,a")
-            writer1.close()
+            Utils.tryWithResource(new PrintWriter(file1)) { writer =>
+              writer.write("1,a")
+            }
 
             val file2 = new File(dir2 + "/data")
-            val writer2 = new PrintWriter(file2)
-            writer2.write("1,a")
-            writer2.close()
+            Utils.tryWithResource(new PrintWriter(file2)) { writer =>
+              writer.write("1,a")
+            }
 
             // add partition command
             sql(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 3284579..9861a0a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -949,9 +949,9 @@ class VersionsSuite extends SparkFunSuite with Logging {
             |}
           """.stripMargin
         val schemaFile = new File(dir, "avroDecimal.avsc")
-        val writer = new PrintWriter(schemaFile)
-        writer.write(avroSchema)
-        writer.close()
+        Utils.tryWithResource(new PrintWriter(schemaFile)) { writer =>
+          writer.write(avroSchema)
+        }
         val schemaPath = schemaFile.toURI.toString
 
         val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index a6fc744..446267d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -454,14 +454,14 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
       // EXTERNAL OpenCSVSerde table pointing to LOCATION
 
       val file1 = new File(tempDir + "/data1")
-      val writer1 = new PrintWriter(file1)
-      writer1.write("1,2")
-      writer1.close()
+      Utils.tryWithResource(new PrintWriter(file1)) { writer =>
+        writer.write("1,2")
+      }
 
       val file2 = new File(tempDir + "/data2")
-      val writer2 = new PrintWriter(file2)
-      writer2.write("1,2")
-      writer2.close()
+      Utils.tryWithResource(new PrintWriter(file2)) { writer =>
+        writer.write("1,2")
+      }
 
       sql(
         s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org