You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/04/19 02:49:28 UTC
git commit: SPARK-1482: Fix potential resource leaks in
saveAsHadoopDataset and save...
Repository: spark
Updated Branches:
refs/heads/master c399baa0f -> 2089e0e7e
SPARK-1482: Fix potential resource leaks in saveAsHadoopDataset and save...
...AsNewAPIHadoopDataset
`writer.close` should be put in the `finally` block to avoid potential resource leaks.
JIRA: https://issues.apache.org/jira/browse/SPARK-1482
Author: zsxwing <zs...@gmail.com>
Closes #400 from zsxwing/SPARK-1482 and squashes the following commits:
06b197a [zsxwing] SPARK-1482: Fix potential resource leaks in saveAsHadoopDataset and saveAsNewAPIHadoopDataset
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2089e0e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2089e0e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2089e0e7
Branch: refs/heads/master
Commit: 2089e0e7e7c73656daee7b56f8100332f4d2175c
Parents: c399baa
Author: zsxwing <zs...@gmail.com>
Authored: Fri Apr 18 17:49:22 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Fri Apr 18 17:49:22 2014 -0700
----------------------------------------------------------------------
.../org/apache/spark/rdd/PairRDDFunctions.scala | 30 ++++++++++++--------
1 file changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2089e0e7/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 343e432..d250bef 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -693,11 +693,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
- while (iter.hasNext) {
- val (k, v) = iter.next()
- writer.write(k, v)
+ try {
+ while (iter.hasNext) {
+ val (k, v) = iter.next()
+ writer.write(k, v)
+ }
+ }
+ finally {
+ writer.close(hadoopContext)
}
- writer.close(hadoopContext)
committer.commitTask(hadoopContext)
return 1
}
@@ -750,15 +754,17 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()
-
- var count = 0
- while(iter.hasNext) {
- val record = iter.next()
- count += 1
- writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
+ try {
+ var count = 0
+ while(iter.hasNext) {
+ val record = iter.next()
+ count += 1
+ writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
+ }
+ }
+ finally {
+ writer.close()
}
-
- writer.close()
writer.commit()
}