You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by gw...@apache.org on 2015/05/13 19:14:54 UTC
sqoop git commit: SQOOP-2151: Sqoop2: Sqoop mapreduce job gets into
deadlock when loader throws an exception
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 b1c742b63 -> ad632361b
SQOOP-2151: Sqoop2: Sqoop mapreduce job gets into deadlock when loader throws an exception
(Ted Malaska via Gwen Shapira)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ad632361
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ad632361
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ad632361
Branch: refs/heads/sqoop2
Commit: ad632361bb18b9bd5ef95ad0d7b3ae7c71c544b4
Parents: b1c742b
Author: Gwen Shapira <cs...@gmail.com>
Authored: Wed May 13 20:13:03 2015 +0300
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed May 13 20:13:03 2015 +0300
----------------------------------------------------------------------
.../job/mr/SqoopOutputFormatLoadExecutor.java | 16 +++++++++++
.../mr/TestSqoopOutputFormatLoadExecutor.java | 29 +++++++++++++-------
2 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ad632361/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 3c091a2..c9d6f10 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -137,6 +137,14 @@ public class SqoopOutputFormatLoadExecutor {
// In almost all cases, the exception will be SqoopException,
// because all exceptions are caught and propagated as
// SqoopExceptions
+
+ //There are race conditions with exceptions where the free sema is
+ //no released. So sense we are in single threaded mode at this point
+ //we can ask if there are availablePermits and release if needed
+ if (free.availablePermits() == 0) {
+ free.release();
+ }
+
Throwable t = ex.getCause();
if (t instanceof SqoopException) {
throw (SqoopException) t;
@@ -144,6 +152,14 @@ public class SqoopOutputFormatLoadExecutor {
//In the rare case, it was not a SqoopException
Throwables.propagate(t);
} catch (Exception ex) {
+
+ //There are race conditions with exceptions where the free sema is
+ //no released. So sense we are in single threaded mode at this point
+ //we can ask if there are availablePermits and release if needed
+ if (free.availablePermits() == 0) {
+ free.release();
+ }
+
throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ad632361/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index 71c98db..3208e8a 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -193,22 +193,31 @@ public class TestSqoopOutputFormatLoadExecutor {
SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(jobContextMock,
GoodContinuousLoader.class.getName(), getIDF(), getMatcher());
RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
- IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
- SqoopWritable writable = new SqoopWritable(dataFormat);
- for (int i = 0; i < 10; i++) {
- StringBuilder builder = new StringBuilder();
- for (int count = 0; count < 100; count++) {
- builder.append(String.valueOf(count));
- if (count != 99) {
- builder.append(",");
+
+ boolean exceptionThrown = false;
+
+ try {
+ IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
+ SqoopWritable writable = new SqoopWritable(dataFormat);
+ for (int i = 0; i < 10; i++) {
+ StringBuilder builder = new StringBuilder();
+ for (int count = 0; count < 100; count++) {
+ builder.append(String.valueOf(count));
+ if (count != 99) {
+ builder.append(",");
+ }
}
+ dataFormat.setCSVTextData(builder.toString());
+ writer.write(writable, null);
}
- dataFormat.setCSVTextData(builder.toString());
- writer.write(writable, null);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ exceptionThrown = true;
}
writer.close(null);
verify(jobContextMock, times(1)).getConfiguration();
verify(jobContextMock, times(1)).getCounter(SqoopCounters.ROWS_WRITTEN);
+ Assert.assertFalse(exceptionThrown, "Exception Thrown during writing");
}
@Test(expectedExceptions = SqoopException.class)