You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/10/31 18:47:39 UTC

spark git commit: [SPARK-11424] Guard against double-close() of RecordReaders

Repository: spark
Updated Branches:
  refs/heads/master 97b3c8fb4 -> ac4118db2


[SPARK-11424] Guard against double-close() of RecordReaders

**TL;DR**: We can rule out one rare but potential cause of input stream corruption via defensive programming.

## Background

[MAPREDUCE-5918](https://issues.apache.org/jira/browse/MAPREDUCE-5918) is a bug where an instance of a decompressor ends up getting placed into a pool multiple times. Since the pool is backed by a list instead of a set, this can lead to the same decompressor being used in different places at the same time, which is not safe because those decompressors will overwrite each other's buffers. Sometimes this buffer sharing will lead to exceptions but other times it will might silently result in invalid / garbled input.

That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop versions that we wish to support. As a result, I think that we should try to work around this issue in Spark via defensive programming to prevent RecordReaders from being closed multiple times.

So far, I've had a hard time coming up with explanations of exactly how double-`close()`s occur in practice, but I do have a couple of explanations that work on paper.

For instance, it looks like https://github.com/apache/spark/pull/7424, added in 1.5, introduces at least one extremely~rare corner-case path where Spark could double-close() a LineRecordReader instance in a way that triggers the bug. Here are the steps involved in the bad execution that I brainstormed up:

* [The task has finished reading input, so we call close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L168).
* [While handling the close call and trying to close the reader, reader.close() throws an exception]( https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L190)
* We don't set `reader = null` after handling this exception, so the [TaskCompletionListener also ends up calling NewHadoopRDD.close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L156), which, in turn, closes the record reader again.

In this hypothetical situation, `LineRecordReader.close()` could [fail with an exception if its InputStream failed to close](https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java#L212).
I googled for "Exception in RecordReader.close()" and it looks like it's possible for a closed Hadoop FileSystem to trigger an error there: [SPARK-757](https://issues.apache.org/jira/browse/SPARK-757), [SPARK-2491](https://issues.apache.org/jira/browse/SPARK-2491)

Looking at [SPARK-3052](https://issues.apache.org/jira/browse/SPARK-3052), it seems like it's possible to get spurious exceptions there when there is an error reading from Hadoop. If the Hadoop FileSystem were to get into an error state _right_ after reading the last record then it looks like we could hit the bug here in 1.5.

## The fix

This patch guards against these issues by modifying `HadoopRDD.close()` and `NewHadoopRDD.close()` so that they set `reader = null` even if an exception occurs in the `reader.close()` call. In addition, I modified `NextIterator. closeIfNeeded()` to guard against double-close if the first `close()` call throws an exception.

I don't have an easy way to test this, since I haven't been able to reproduce the bug that prompted this patch, but these changes seem safe and seem to rule out the on-paper reproductions that I was able to brainstorm up.

Author: Josh Rosen <jo...@databricks.com>

Closes #9382 from JoshRosen/hadoop-decompressor-pooling-fix and squashes the following commits:

5ec97d7 [Josh Rosen] Add SqlNewHadoopRDD.unsetInputFileName() that I accidentally deleted.
ae46cf4 [Josh Rosen] Merge remote-tracking branch 'origin/master' into hadoop-decompressor-pooling-fix
087aa63 [Josh Rosen] Guard against double-close() of RecordReaders.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac4118db
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac4118db
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac4118db

Branch: refs/heads/master
Commit: ac4118db2dda802b936bb7a18a08844846c71285
Parents: 97b3c8f
Author: Josh Rosen <jo...@databricks.com>
Authored: Sat Oct 31 10:47:22 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Sat Oct 31 10:47:22 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 23 ++++++----
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 44 +++++++++---------
 .../org/apache/spark/rdd/SqlNewHadoopRDD.scala  | 47 +++++++++++---------
 .../org/apache/spark/util/NextIterator.scala    |  4 +-
 4 files changed, 66 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ac4118db/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 77b5713..d841f05 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -251,8 +251,21 @@ class HadoopRDD[K, V](
       }
 
       override def close() {
-        try {
-          reader.close()
+        if (reader != null) {
+          // Close the reader and release it. Note: it's very important that we don't close the
+          // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
+          // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
+          // corruption issues when reading compressed input.
+          try {
+            reader.close()
+          } catch {
+            case e: Exception =>
+              if (!ShutdownHookManager.inShutdown()) {
+                logWarning("Exception in RecordReader.close()", e)
+              }
+          } finally {
+            reader = null
+          }
           if (bytesReadCallback.isDefined) {
             inputMetrics.updateBytesRead()
           } else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
@@ -266,12 +279,6 @@ class HadoopRDD[K, V](
                 logWarning("Unable to get input size to set InputMetrics for task", e)
             }
           }
-        } catch {
-          case e: Exception => {
-            if (!ShutdownHookManager.inShutdown()) {
-              logWarning("Exception in RecordReader.close()", e)
-            }
-          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ac4118db/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 2872b93..9c4b708 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -184,30 +184,32 @@ class NewHadoopRDD[K, V](
       }
 
       private def close() {
-        try {
-          if (reader != null) {
-            // Close reader and release it
+        if (reader != null) {
+          // Close the reader and release it. Note: it's very important that we don't close the
+          // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
+          // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
+          // corruption issues when reading compressed input.
+          try {
             reader.close()
-            reader = null
-
-            if (bytesReadCallback.isDefined) {
-              inputMetrics.updateBytesRead()
-            } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
-                       split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
-              // If we can't get the bytes read from the FS stats, fall back to the split size,
-              // which may be inaccurate.
-              try {
-                inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
-              } catch {
-                case e: java.io.IOException =>
-                  logWarning("Unable to get input size to set InputMetrics for task", e)
+          } catch {
+            case e: Exception =>
+              if (!ShutdownHookManager.inShutdown()) {
+                logWarning("Exception in RecordReader.close()", e)
               }
-            }
+          } finally {
+            reader = null
           }
-        } catch {
-          case e: Exception => {
-            if (!ShutdownHookManager.inShutdown()) {
-              logWarning("Exception in RecordReader.close()", e)
+          if (bytesReadCallback.isDefined) {
+            inputMetrics.updateBytesRead()
+          } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+                     split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
+            // If we can't get the bytes read from the FS stats, fall back to the split size,
+            // which may be inaccurate.
+            try {
+              inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+            } catch {
+              case e: java.io.IOException =>
+                logWarning("Unable to get input size to set InputMetrics for task", e)
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/ac4118db/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index 0228c54..264dae7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -189,32 +189,35 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
       }
 
       private def close() {
-        try {
-          if (reader != null) {
+        if (reader != null) {
+          SqlNewHadoopRDD.unsetInputFileName()
+          // Close the reader and release it. Note: it's very important that we don't close the
+          // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
+          // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
+          // corruption issues when reading compressed input.
+          try {
             reader.close()
-            reader = null
-
-            SqlNewHadoopRDD.unsetInputFileName()
-
-            if (bytesReadCallback.isDefined) {
-              inputMetrics.updateBytesRead()
-            } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
-                       split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
-              // If we can't get the bytes read from the FS stats, fall back to the split size,
-              // which may be inaccurate.
-              try {
-                inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
-              } catch {
-                case e: java.io.IOException =>
-                  logWarning("Unable to get input size to set InputMetrics for task", e)
+          } catch {
+            case e: Exception =>
+              if (!ShutdownHookManager.inShutdown()) {
+                logWarning("Exception in RecordReader.close()", e)
               }
-            }
+          } finally {
+            reader = null
           }
-        } catch {
-          case e: Exception =>
-            if (!ShutdownHookManager.inShutdown()) {
-              logWarning("Exception in RecordReader.close()", e)
+          if (bytesReadCallback.isDefined) {
+            inputMetrics.updateBytesRead()
+          } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+                     split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
+            // If we can't get the bytes read from the FS stats, fall back to the split size,
+            // which may be inaccurate.
+            try {
+              inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+            } catch {
+              case e: java.io.IOException =>
+                logWarning("Unable to get input size to set InputMetrics for task", e)
             }
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ac4118db/core/src/main/scala/org/apache/spark/util/NextIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/NextIterator.scala b/core/src/main/scala/org/apache/spark/util/NextIterator.scala
index e5c732a..0b505a5 100644
--- a/core/src/main/scala/org/apache/spark/util/NextIterator.scala
+++ b/core/src/main/scala/org/apache/spark/util/NextIterator.scala
@@ -60,8 +60,10 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] {
    */
   def closeIfNeeded() {
     if (!closed) {
-      close()
+      // Note: it's important that we set closed = true before calling close(), since setting it
+      // afterwards would permit us to call close() multiple times if close() threw an exception.
       closed = true
+      close()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org