You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/06/23 18:08:36 UTC
spark git commit: [SPARK-8498] [TUNGSTEN] fix npe in errorhandling
path in unsafeshuffle writer
Repository: spark
Updated Branches:
refs/heads/master 6ceb16960 -> 0f92be5b5
[SPARK-8498] [TUNGSTEN] fix npe in errorhandling path in unsafeshuffle writer
Author: Holden Karau <ho...@pigscanfly.ca>
Closes #6918 from holdenk/SPARK-8498-fix-npe-in-errorhandling-path-in-unsafeshuffle-writer and squashes the following commits:
f807832 [Holden Karau] Log error if we can't throw it
855f9aa [Holden Karau] Spelling - not my strongest suite. Fix Propegates to Propagates.
039d620 [Holden Karau] Add missing closeandwriteoutput
30e558d [Holden Karau] go back to try/finally
e503b8c [Holden Karau] Improve the test to ensure we aren't masking the underlying exception
ae0b7a7 [Holden Karau] Fix the test
2e6abf7 [Holden Karau] Be more cautious when cleaning up during failed write and re-throw user exceptions
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f92be5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f92be5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f92be5b
Branch: refs/heads/master
Commit: 0f92be5b5f017b593bd29d4da7e89aad2b3adac2
Parents: 6ceb169
Author: Holden Karau <ho...@pigscanfly.ca>
Authored: Tue Jun 23 09:08:11 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Jun 23 09:08:11 2015 -0700
----------------------------------------------------------------------
.../spark/shuffle/unsafe/UnsafeShuffleWriter.java | 18 ++++++++++++++++--
.../shuffle/unsafe/UnsafeShuffleWriterSuite.java | 17 +++++++++++++++++
2 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0f92be5b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
index ad7eb04..764578b 100644
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
@@ -139,6 +139,9 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
+ // Keep track of success so we know if we ecountered an exception
+ // We do this rather than a standard try/catch/re-throw to handle
+ // generic throwables.
boolean success = false;
try {
while (records.hasNext()) {
@@ -147,8 +150,19 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
closeAndWriteOutput();
success = true;
} finally {
- if (!success) {
- sorter.cleanupAfterError();
+ if (sorter != null) {
+ try {
+ sorter.cleanupAfterError();
+ } catch (Exception e) {
+ // Only throw this error if we won't be masking another
+ // error.
+ if (success) {
+ throw e;
+ } else {
+ logger.error("In addition to a failure during writing, we failed during " +
+ "cleanup.", e);
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0f92be5b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 83d1091..10c3eed 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -253,6 +253,23 @@ public class UnsafeShuffleWriterSuite {
createWriter(false).stop(false);
}
+ class PandaException extends RuntimeException {
+ }
+
+ @Test(expected=PandaException.class)
+ public void writeFailurePropagates() throws Exception {
+ class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Object>> {
+ @Override public boolean hasNext() {
+ throw new PandaException();
+ }
+ @Override public Product2<Object, Object> next() {
+ return null;
+ }
+ }
+ final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
+ writer.write(new BadRecords());
+ }
+
@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org