You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2015/07/24 21:37:14 UTC

spark git commit: [SPARK-9067] [SQL] Close reader in NewHadoopRDD early if there is no more data

Repository: spark
Updated Branches:
  refs/heads/master 9a1139611 -> 64135cbb3


[SPARK-9067] [SQL] Close reader in NewHadoopRDD early if there is no more data

JIRA: https://issues.apache.org/jira/browse/SPARK-9067

According to the description of the JIRA ticket, calling `reader.close()` only after the task is finished will cause memory and file open limit problem since these resources are occupied even we don't need that anymore.

This PR simply closes the reader early when we know there is no more data to read.

Author: Liang-Chi Hsieh <vi...@appier.com>

Closes #7424 from viirya/close_reader and squashes the following commits:

3ff64e5 [Liang-Chi Hsieh] For comments.
3d20267 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader
e152182 [Liang-Chi Hsieh] For comments.
5116cbe [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader
3ceb755 [Liang-Chi Hsieh] For comments.
e34d98e [Liang-Chi Hsieh] For comments.
50ed729 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader
216912f [Liang-Chi Hsieh] Fix it.
f429016 [Liang-Chi Hsieh] Release reader if we don't need it.
a305621 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader
67569da [Liang-Chi Hsieh] Close reader early if there is no more data.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64135cbb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64135cbb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64135cbb

Branch: refs/heads/master
Commit: 64135cbb3363e3b74dad3c0498cb9959c047d381
Parents: 9a11396
Author: Liang-Chi Hsieh <vi...@appier.com>
Authored: Fri Jul 24 12:36:44 2015 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Fri Jul 24 12:36:44 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 37 +++++++++++++-------
 .../spark/sql/execution/SqlNewHadoopRDD.scala   | 36 ++++++++++++-------
 2 files changed, 47 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64135cbb/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index f827270..f83a051 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -128,7 +128,7 @@ class NewHadoopRDD[K, V](
           configurable.setConf(conf)
         case _ =>
       }
-      val reader = format.createRecordReader(
+      private var reader = format.createRecordReader(
         split.serializableHadoopSplit.value, hadoopAttemptContext)
       reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
 
@@ -141,6 +141,12 @@ class NewHadoopRDD[K, V](
       override def hasNext: Boolean = {
         if (!finished && !havePair) {
           finished = !reader.nextKeyValue
+          if (finished) {
+            // Close and release the reader here; close() will also be called when the task
+            // completes, but for tasks that read from many files, it helps to release the
+            // resources early.
+            close()
+          }
           havePair = !finished
         }
         !finished
@@ -159,18 +165,23 @@ class NewHadoopRDD[K, V](
 
       private def close() {
         try {
-          reader.close()
-          if (bytesReadCallback.isDefined) {
-            inputMetrics.updateBytesRead()
-          } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
-                     split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
-            // If we can't get the bytes read from the FS stats, fall back to the split size,
-            // which may be inaccurate.
-            try {
-              inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
-            } catch {
-              case e: java.io.IOException =>
-                logWarning("Unable to get input size to set InputMetrics for task", e)
+          if (reader != null) {
+            // Close reader and release it
+            reader.close()
+            reader = null
+
+            if (bytesReadCallback.isDefined) {
+              inputMetrics.updateBytesRead()
+            } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+                       split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
+              // If we can't get the bytes read from the FS stats, fall back to the split size,
+              // which may be inaccurate.
+              try {
+                inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+              } catch {
+                case e: java.io.IOException =>
+                  logWarning("Unable to get input size to set InputMetrics for task", e)
+              }
             }
           }
         } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/64135cbb/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala
index e1c1a6c..3d75b6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala
@@ -147,7 +147,7 @@ private[sql] class SqlNewHadoopRDD[K, V](
           configurable.setConf(conf)
         case _ =>
       }
-      val reader = format.createRecordReader(
+      private var reader = format.createRecordReader(
         split.serializableHadoopSplit.value, hadoopAttemptContext)
       reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
 
@@ -160,6 +160,12 @@ private[sql] class SqlNewHadoopRDD[K, V](
       override def hasNext: Boolean = {
         if (!finished && !havePair) {
           finished = !reader.nextKeyValue
+          if (finished) {
+            // Close and release the reader here; close() will also be called when the task
+            // completes, but for tasks that read from many files, it helps to release the
+            // resources early.
+            close()
+          }
           havePair = !finished
         }
         !finished
@@ -178,18 +184,22 @@ private[sql] class SqlNewHadoopRDD[K, V](
 
       private def close() {
         try {
-          reader.close()
-          if (bytesReadCallback.isDefined) {
-            inputMetrics.updateBytesRead()
-          } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
-                     split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
-            // If we can't get the bytes read from the FS stats, fall back to the split size,
-            // which may be inaccurate.
-            try {
-              inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
-            } catch {
-              case e: java.io.IOException =>
-                logWarning("Unable to get input size to set InputMetrics for task", e)
+          if (reader != null) {
+            reader.close()
+            reader = null
+
+            if (bytesReadCallback.isDefined) {
+              inputMetrics.updateBytesRead()
+            } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+                       split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
+              // If we can't get the bytes read from the FS stats, fall back to the split size,
+              // which may be inaccurate.
+              try {
+                inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+              } catch {
+                case e: java.io.IOException =>
+                  logWarning("Unable to get input size to set InputMetrics for task", e)
+              }
             }
           }
         } catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org