You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/14 02:55:33 UTC
spark git commit: [SPARK-23399][SQL] Register a task completion
listener first for OrcColumnarBatchReader
Repository: spark
Updated Branches:
refs/heads/master d6f5e172b -> 357babde5
[SPARK-23399][SQL] Register a task completion listener first for OrcColumnarBatchReader
## What changes were proposed in this pull request?
This PR aims to resolve an open file leakage issue reported at [SPARK-23390](https://issues.apache.org/jira/browse/SPARK-23390) by moving the listener registration position. Currently, the sequence is like the following.
1. Create `batchReader`
2. `batchReader.initialize` opens a ORC file.
3. `batchReader.initBatch` may take a long time to alloc memory in some environment and cause errors.
4. `Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))`
This PR moves 4 before 2 and 3. To sum up, the new sequence is 1 -> 4 -> 2 -> 3.
## How was this patch tested?
Manual. The following test case makes OOM intentionally to cause leaked filesystem connection in the current code base. With this patch, leakage doesn't occurs.
```scala
// This should be tested manually because it raises OOM intentionally
// in order to cause `Leaked filesystem connection`.
test("SPARK-23399 Register a task completion listener first for OrcColumnarBatchReader") {
withSQLConf(SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
Seq(0).toDF("a").write.format("orc").save(new Path(basePath, "first").toString)
Seq(1).toDF("a").write.format("orc").save(new Path(basePath, "second").toString)
val df = spark.read.orc(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString)
val e = intercept[SparkException] {
df.collect()
}
assert(e.getCause.isInstanceOf[OutOfMemoryError])
}
}
}
```
Author: Dongjoon Hyun <do...@apache.org>
Closes #20590 from dongjoon-hyun/SPARK-23399.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/357babde
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/357babde
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/357babde
Branch: refs/heads/master
Commit: 357babde5a8eb9710de7016d7ae82dee21fa4ef3
Parents: d6f5e17
Author: Dongjoon Hyun <do...@apache.org>
Authored: Wed Feb 14 10:55:24 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Feb 14 10:55:24 2018 +0800
----------------------------------------------------------------------
.../spark/sql/execution/datasources/orc/OrcFileFormat.scala | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/357babde/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index dbf3bc6..1de2ca2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -188,6 +188,12 @@ class OrcFileFormat
if (enableVectorizedReader) {
val batchReader = new OrcColumnarBatchReader(
enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity)
+ // SPARK-23399 Register a task completion listener first to call `close()` in all cases.
+ // There is a possibility that `initialize` and `initBatch` hit some errors (like OOM)
+ // after opening a file.
+ val iter = new RecordReaderIterator(batchReader)
+ Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
+
batchReader.initialize(fileSplit, taskAttemptContext)
batchReader.initBatch(
reader.getSchema,
@@ -196,8 +202,6 @@ class OrcFileFormat
partitionSchema,
file.partitionValues)
- val iter = new RecordReaderIterator(batchReader)
- Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
iter.asInstanceOf[Iterator[InternalRow]]
} else {
val orcRecordReader = new OrcInputFormat[OrcStruct]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org