You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/09/28 07:59:03 UTC
spark git commit: [SPARK-17666] Ensure that RecordReaders are closed
by data source file scans (backport)
Repository: spark
Updated Branches:
refs/heads/branch-2.0 2cd327ef5 -> 1b02f8820
[SPARK-17666] Ensure that RecordReaders are closed by data source file scans (backport)
This is a branch-2.0 backport of #15245.
## What changes were proposed in this pull request?
This patch addresses a potential cause of resource leaks in data source file scans. As reported in [SPARK-17666](https://issues.apache.org/jira/browse/SPARK-17666), tasks which do not fully-consume their input may cause file handles / network connections (e.g. S3 connections) to be leaked. Spark's `NewHadoopRDD` uses a TaskContext callback to [close its record readers](https://github.com/apache/spark/blame/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L208), but the new data source file scans will only close record readers once their iterators are fully-consumed.
This patch modifies `RecordReaderIterator` and `HadoopFileLinesReader` to add `close()` methods and modifies all six implementations of `FileFormat.buildReader()` to register TaskContext task completion callbacks to guarantee that cleanup is eventually performed.
## How was this patch tested?
Tested manually for now.
Author: Josh Rosen <jo...@databricks.com>
Closes #15271 from JoshRosen/SPARK-17666-backport.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b02f882
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b02f882
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b02f882
Branch: refs/heads/branch-2.0
Commit: 1b02f8820ddaf3f2a0e7acc9a7f27afc20683cca
Parents: 2cd327e
Author: Josh Rosen <jo...@databricks.com>
Authored: Wed Sep 28 00:59:00 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Sep 28 00:59:00 2016 -0700
----------------------------------------------------------------------
.../spark/ml/source/libsvm/LibSVMRelation.scala | 7 +++++--
.../datasources/HadoopFileLinesReader.scala | 6 +++++-
.../datasources/RecordReaderIterator.scala | 21 ++++++++++++++++++--
.../datasources/csv/CSVFileFormat.scala | 5 ++++-
.../datasources/json/JsonFileFormat.scala | 5 ++++-
.../datasources/parquet/ParquetFileFormat.scala | 3 ++-
.../datasources/text/TextFileFormat.scala | 2 ++
.../spark/sql/hive/orc/OrcFileFormat.scala | 6 +++++-
8 files changed, 46 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1b02f882/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 034223e..ac95b92 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
+import org.apache.spark.TaskContext
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.mllib.util.MLUtils
@@ -160,8 +161,10 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSour
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
- val points =
- new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
+ val linesReader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
+
+ val points = linesReader
.map(_.toString.trim)
.filterNot(line => line.isEmpty || line.startsWith("#"))
.map { line =>
http://git-wip-us.apache.org/repos/asf/spark/blob/1b02f882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
index 18f9b55..83cf26c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources
+import java.io.Closeable
import java.net.URI
import org.apache.hadoop.conf.Configuration
@@ -30,7 +31,8 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
* An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which are all of the lines
* in that file.
*/
-class HadoopFileLinesReader(file: PartitionedFile, conf: Configuration) extends Iterator[Text] {
+class HadoopFileLinesReader(
+ file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable {
private val iterator = {
val fileSplit = new FileSplit(
new Path(new URI(file.filePath)),
@@ -48,4 +50,6 @@ class HadoopFileLinesReader(file: PartitionedFile, conf: Configuration) extends
override def hasNext: Boolean = iterator.hasNext
override def next(): Text = iterator.next()
+
+ override def close(): Unit = iterator.close()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1b02f882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
index f03ae94..938af25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import java.io.Closeable
+
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.spark.sql.catalyst.InternalRow
@@ -27,7 +29,8 @@ import org.apache.spark.sql.catalyst.InternalRow
* Note that this returns [[Object]]s instead of [[InternalRow]] because we rely on erasure to pass
* column batches by pretending they are rows.
*/
-class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T] {
+class RecordReaderIterator[T](
+ private[this] var rowReader: RecordReader[_, T]) extends Iterator[T] with Closeable {
private[this] var havePair = false
private[this] var finished = false
@@ -38,7 +41,7 @@ class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T]
// Close and release the reader here; close() will also be called when the task
// completes, but for tasks that read from many files, it helps to release the
// resources early.
- rowReader.close()
+ close()
}
havePair = !finished
}
@@ -52,4 +55,18 @@ class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T]
havePair = false
rowReader.getCurrentValue
}
+
+ override def close(): Unit = {
+ if (rowReader != null) {
+ try {
+ // Close the reader and release it. Note: it's very important that we don't close the
+ // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
+ // older Hadoop 2.x releases. That bug can lead to non-deterministic corruption issues
+ // when reading compressed input.
+ rowReader.close()
+ } finally {
+ rowReader = null
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1b02f882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 4a60f51..107b600 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce._
+import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -111,7 +112,9 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
(file: PartitionedFile) => {
val lineIterator = {
val conf = broadcastedHadoopConf.value.value
- new HadoopFileLinesReader(file, conf).map { line =>
+ val linesReader = new HadoopFileLinesReader(file, conf)
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
+ linesReader.map { line =>
new String(line.getBytes, 0, line.getLength, csvOptions.charset)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1b02f882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index decbdda..cba3255 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
+import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -106,7 +107,9 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
(file: PartitionedFile) => {
- val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString)
+ val linesReader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
+ val lines = linesReader.map(_.toString)
JacksonParser.parseJson(
lines,
http://git-wip-us.apache.org/repos/asf/spark/blob/1b02f882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 9498088..aef0f1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.schema.MessageType
import org.slf4j.bridge.SLF4JBridgeHandler
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
@@ -387,6 +387,7 @@ class ParquetFileFormat
}
val iter = new RecordReaderIterator(parquetReader)
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
http://git-wip-us.apache.org/repos/asf/spark/blob/1b02f882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index abb6059..6aa078a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
+import org.apache.spark.TaskContext
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -101,6 +102,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
(file: PartitionedFile) => {
val reader = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value)
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => reader.close()))
if (requiredSchema.isEmpty) {
val emptyUnsafeRow = new UnsafeRow(0)
http://git-wip-us.apache.org/repos/asf/spark/blob/1b02f882/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index d15fb84..fc126b3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, Outp
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.{Row, SparkSession}
@@ -150,12 +151,15 @@ class OrcFileFormat
new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, fileSplit.getLength)
}
+ val recordsIterator = new RecordReaderIterator[OrcStruct](orcRecordReader)
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => recordsIterator.close()))
+
// Unwraps `OrcStruct`s to `UnsafeRow`s
OrcRelation.unwrapOrcStructs(
conf,
requiredSchema,
Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
- new RecordReaderIterator[OrcStruct](orcRecordReader))
+ recordsIterator)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org