You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/06/23 18:08:36 UTC

spark git commit: [SPARK-8498] [TUNGSTEN] fix npe in errorhandling path in unsafeshuffle writer

Repository: spark
Updated Branches:
  refs/heads/master 6ceb16960 -> 0f92be5b5


[SPARK-8498] [TUNGSTEN] fix npe in errorhandling path in unsafeshuffle writer

Author: Holden Karau <ho...@pigscanfly.ca>

Closes #6918 from holdenk/SPARK-8498-fix-npe-in-errorhandling-path-in-unsafeshuffle-writer and squashes the following commits:

f807832 [Holden Karau] Log error if we can't throw it
855f9aa [Holden Karau] Spelling - not my strongest suite. Fix Propegates to Propagates.
039d620 [Holden Karau] Add missing closeandwriteoutput
30e558d [Holden Karau] go back to try/finally
e503b8c [Holden Karau] Improve the test to ensure we aren't masking the underlying exception
ae0b7a7 [Holden Karau] Fix the test
2e6abf7 [Holden Karau] Be more cautious when cleaning up during failed write and re-throw user exceptions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f92be5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f92be5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f92be5b

Branch: refs/heads/master
Commit: 0f92be5b5f017b593bd29d4da7e89aad2b3adac2
Parents: 6ceb169
Author: Holden Karau <ho...@pigscanfly.ca>
Authored: Tue Jun 23 09:08:11 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Jun 23 09:08:11 2015 -0700

----------------------------------------------------------------------
 .../spark/shuffle/unsafe/UnsafeShuffleWriter.java | 18 ++++++++++++++++--
 .../shuffle/unsafe/UnsafeShuffleWriterSuite.java  | 17 +++++++++++++++++
 2 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0f92be5b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
index ad7eb04..764578b 100644
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
@@ -139,6 +139,9 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
 
   @Override
   public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
+    // Keep track of success so we know if we ecountered an exception
+    // We do this rather than a standard try/catch/re-throw to handle
+    // generic throwables.
     boolean success = false;
     try {
       while (records.hasNext()) {
@@ -147,8 +150,19 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
       closeAndWriteOutput();
       success = true;
     } finally {
-      if (!success) {
-        sorter.cleanupAfterError();
+      if (sorter != null) {
+        try {
+          sorter.cleanupAfterError();
+        } catch (Exception e) {
+          // Only throw this error if we won't be masking another
+          // error.
+          if (success) {
+            throw e;
+          } else {
+            logger.error("In addition to a failure during writing, we failed during " +
+                         "cleanup.", e);
+          }
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0f92be5b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 83d1091..10c3eed 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -253,6 +253,23 @@ public class UnsafeShuffleWriterSuite {
     createWriter(false).stop(false);
   }
 
+  class PandaException extends RuntimeException {
+  }
+
+  @Test(expected=PandaException.class)
+  public void writeFailurePropagates() throws Exception {
+    class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Object>> {
+      @Override public boolean hasNext() {
+        throw new PandaException();
+      }
+      @Override public Product2<Object, Object> next() {
+        return null;
+      }
+    }
+    final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
+    writer.write(new BadRecords());
+  }
+
   @Test
   public void writeEmptyIterator() throws Exception {
     final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org