You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/08/11 06:18:36 UTC

[spark] branch master updated: [SPARK-36456][CORE][SQL][SS] Clean up compilation warnings related to `method closeQuietly in class IOUtils is deprecated`

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f7c85b8  [SPARK-36456][CORE][SQL][SS] Clean up compilation warnings related to `method closeQuietly in class IOUtils is deprecated`
f7c85b8 is described below

commit f7c85b855ba99757c750dd0a2f7aced788c89374
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Tue Aug 10 23:17:57 2021 -0700

    [SPARK-36456][CORE][SQL][SS] Clean up compilation warnings related to `method closeQuietly in class IOUtils is deprecated`
    
    ### What changes were proposed in this pull request?
    There are some compilation warnings related to `method closeQuietly in class IOUtils is deprecated`,  `Apache commons-io` suggests that we should use the `try-with-resources` statement or handle suppressed exceptions manually.
    
    The main change of this pr is replace  `o.a.commons.io.IOUtils.closeQuietly` with `o.a.s.network.util.JavaUtils.closeQuietly` directly because all original logic is suppressing `IOException`.
    
    ### Why are the changes needed?
    Clean up compilation warnings related to `method closeQuietly in class IOUtils is deprecated`
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    - Pass the Jenkins or GitHub Action
    
    Closes #33682 from LuciferYang/closeQuietly.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 core/src/main/scala/org/apache/spark/storage/BlockManager.scala     | 5 ++---
 .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala      | 5 ++---
 core/src/main/scala/org/apache/spark/util/Utils.scala               | 4 ++--
 .../scala/org/apache/spark/util/logging/RollingFileAppender.scala   | 5 +++--
 core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala   | 3 ++-
 core/src/test/scala/org/apache/spark/util/UtilsSuite.scala          | 6 +++---
 .../org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala  | 4 ++--
 .../org/apache/spark/sql/execution/streaming/StreamMetadata.scala   | 4 ++--
 .../execution/streaming/state/HDFSBackedStateStoreProvider.scala    | 4 ++--
 .../spark/sql/execution/streaming/state/RocksDBFileManager.scala    | 5 +++--
 10 files changed, 23 insertions(+), 22 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 4c052db..92b3608 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -35,7 +35,6 @@ import scala.util.control.NonFatal
 
 import com.codahale.metrics.{MetricRegistry, MetricSet}
 import com.github.benmanes.caffeine.cache.Caffeine
-import org.apache.commons.io.IOUtils
 
 import org.apache.spark._
 import org.apache.spark.errors.SparkCoreErrors
@@ -52,7 +51,7 @@ import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle._
 import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper}
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
-import org.apache.spark.network.util.TransportConf
+import org.apache.spark.network.util.{JavaUtils, TransportConf}
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
@@ -342,7 +341,7 @@ private[spark] class BlockManager(
             false
         }
       } finally {
-        IOUtils.closeQuietly(inputStream)
+        JavaUtils.closeQuietly(inputStream)
       }
     }
 
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index eaecf65..b1713ec 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -29,7 +29,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
 import scala.util.{Failure, Success}
 
 import io.netty.util.internal.OutOfDirectMemoryError
-import org.apache.commons.io.IOUtils
 import org.roaringbitmap.RoaringBitmap
 
 import org.apache.spark.{MapOutputTracker, TaskContext}
@@ -39,7 +38,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
 import org.apache.spark.network.shuffle._
 import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper}
-import org.apache.spark.network.util.{NettyUtils, TransportConf}
+import org.apache.spark.network.util.{JavaUtils, NettyUtils, TransportConf}
 import org.apache.spark.shuffle.ShuffleReadMetricsReporter
 import org.apache.spark.util.{CompletionIterator, TaskCompletionListener, Utils}
 
@@ -1304,7 +1303,7 @@ private class BufferReleasingInputStream(
         val diagnosisResponse = checkedInOpt.map { checkedIn =>
           iterator.diagnoseCorruption(checkedIn, address, blockId)
         }
-        IOUtils.closeQuietly(this)
+        JavaUtils.closeQuietly(this)
         // We'd never retry the block whatever the cause is since the block has been
         // partially consumed by downstream RDDs.
         iterator.throwFetchFailedException(blockId, mapIndex, address, e, diagnosisResponse)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f3268cb..e1c26a4 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -3139,8 +3139,8 @@ private[spark] object Utils extends Logging {
       logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}")
     } finally {
       // Close everything no matter what happened
-      IOUtils.closeQuietly(in)
-      IOUtils.closeQuietly(out)
+      JavaUtils.closeQuietly(in)
+      JavaUtils.closeQuietly(out)
     }
     files.toSeq
   }
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index 68a5923..10363a9 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -25,6 +25,7 @@ import org.apache.commons.io.IOUtils
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config
+import org.apache.spark.network.util.JavaUtils
 
 /**
  * Continuously appends data from input stream into the given file, and rolls
@@ -94,8 +95,8 @@ private[spark] class RollingFileAppender(
         gzOutputStream.close()
         activeFile.delete()
       } finally {
-        IOUtils.closeQuietly(inputStream)
-        IOUtils.closeQuietly(gzOutputStream)
+        JavaUtils.closeQuietly(inputStream)
+        JavaUtils.closeQuietly(gzOutputStream)
       }
     } else {
       Files.move(activeFile, rolloverFile)
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 71010a1..1197bea 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.util.logging.{FileAppender, RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy}
 
 class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
@@ -380,7 +381,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
         try {
           IOUtils.toString(inputStream, StandardCharsets.UTF_8)
         } finally {
-          IOUtils.closeQuietly(inputStream)
+          JavaUtils.closeQuietly(inputStream)
         }
       } else {
         Files.toString(file, StandardCharsets.UTF_8)
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 677efec..c1b7b5f 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.launcher.SparkLauncher
-import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.network.util.{ByteUnit, JavaUtils}
 import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.util.io.ChunkedByteBufferInputStream
 
@@ -245,8 +245,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
         assert(mergedStream.read() === -1)
         assert(byteBufferInputStream.chunkedByteBuffer === null)
       } finally {
-        IOUtils.closeQuietly(mergedStream)
-        IOUtils.closeQuietly(in)
+        JavaUtils.closeQuietly(mergedStream)
+        JavaUtils.closeQuietly(in)
       }
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 8a037b5..2a4e064 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -22,12 +22,12 @@ import java.nio.charset.StandardCharsets
 
 import scala.reflect.ClassTag
 
-import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs._
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.errors.QueryExecutionErrors
 
@@ -147,7 +147,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
           throw new IllegalStateException(
             s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise)
       } finally {
-        IOUtils.closeQuietly(input)
+        JavaUtils.closeQuietly(input)
       }
     } else {
       throw QueryExecutionErrors.batchMetadataFileNotFoundError(batchMetadataFile)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
index cb18988..b46be4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -22,13 +22,13 @@ import java.nio.charset.StandardCharsets
 
 import scala.util.control.NonFatal
 
-import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, Path}
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
 
@@ -63,7 +63,7 @@ object StreamMetadata extends Logging {
           logError(s"Error reading stream metadata from $metadataFile", e)
           throw e
       } finally {
-        IOUtils.closeQuietly(input)
+        JavaUtils.closeQuietly(input)
       }
     } else None
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 75b7dae..ce2bbe8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -27,13 +27,13 @@ import scala.collection.mutable
 import scala.util.control.NonFatal
 
 import com.google.common.io.ByteStreams
-import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.internal.Logging
 import org.apache.spark.io.CompressionCodec
+import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager
@@ -542,7 +542,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
       rawStream: CancellableFSDataOutputStream): Unit = {
     try {
       if (rawStream != null) rawStream.cancel()
-      IOUtils.closeQuietly(compressedStream)
+      JavaUtils.closeQuietly(compressedStream)
     } catch {
       case e: FSError if e.getCause.isInstanceOf[IOException] =>
         // Closing the compressedStream causes the stream to write/flush flush data into the
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 23cdbd0..3378064 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -38,6 +38,7 @@ import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager
 import org.apache.spark.util.Utils
 
@@ -458,8 +459,8 @@ class RocksDBFileManager(
         throw e
     } finally {
       // Close everything no matter what happened
-      IOUtils.closeQuietly(in)
-      IOUtils.closeQuietly(zout)
+      JavaUtils.closeQuietly(in)
+      JavaUtils.closeQuietly(zout)
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org