You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by gw...@apache.org on 2015/05/13 19:14:54 UTC

sqoop git commit: SQOOP-2151: Sqoop2: Sqoop mapreduce job gets into deadlock when loader throws an exception

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 b1c742b63 -> ad632361b


SQOOP-2151: Sqoop2: Sqoop mapreduce job gets into deadlock when loader throws an exception

(Ted Malaska via Gwen Shapira)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ad632361
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ad632361
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ad632361

Branch: refs/heads/sqoop2
Commit: ad632361bb18b9bd5ef95ad0d7b3ae7c71c544b4
Parents: b1c742b
Author: Gwen Shapira <cs...@gmail.com>
Authored: Wed May 13 20:13:03 2015 +0300
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed May 13 20:13:03 2015 +0300

----------------------------------------------------------------------
 .../job/mr/SqoopOutputFormatLoadExecutor.java   | 16 +++++++++++
 .../mr/TestSqoopOutputFormatLoadExecutor.java   | 29 +++++++++++++-------
 2 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/ad632361/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 3c091a2..c9d6f10 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -137,6 +137,14 @@ public class SqoopOutputFormatLoadExecutor {
       // In almost all cases, the exception will be SqoopException,
       // because all exceptions are caught and propagated as
       // SqoopExceptions
+
+      //There are race conditions with exceptions where the free sema is
+      //no released.  So sense we are in single threaded mode at this point
+      //we can ask if there are availablePermits and release if needed
+      if (free.availablePermits() == 0) {
+        free.release();
+      }
+
       Throwable t = ex.getCause();
       if (t instanceof SqoopException) {
         throw (SqoopException) t;
@@ -144,6 +152,14 @@ public class SqoopOutputFormatLoadExecutor {
       //In the rare case, it was not a SqoopException
       Throwables.propagate(t);
     } catch (Exception ex) {
+
+      //There are race conditions with exceptions where the free sema is
+      //no released.  So sense we are in single threaded mode at this point
+      //we can ask if there are availablePermits and release if needed
+      if (free.availablePermits() == 0) {
+        free.release();
+      }
+
       throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ad632361/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index 71c98db..3208e8a 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -193,22 +193,31 @@ public class TestSqoopOutputFormatLoadExecutor {
     SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(jobContextMock,
         GoodContinuousLoader.class.getName(), getIDF(), getMatcher());
     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
-    IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
-    SqoopWritable writable = new SqoopWritable(dataFormat);
-    for (int i = 0; i < 10; i++) {
-      StringBuilder builder = new StringBuilder();
-      for (int count = 0; count < 100; count++) {
-        builder.append(String.valueOf(count));
-        if (count != 99) {
-          builder.append(",");
+
+    boolean exceptionThrown = false;
+
+    try {
+      IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
+      SqoopWritable writable = new SqoopWritable(dataFormat);
+      for (int i = 0; i < 10; i++) {
+        StringBuilder builder = new StringBuilder();
+        for (int count = 0; count < 100; count++) {
+          builder.append(String.valueOf(count));
+          if (count != 99) {
+            builder.append(",");
+          }
         }
+        dataFormat.setCSVTextData(builder.toString());
+        writer.write(writable, null);
       }
-      dataFormat.setCSVTextData(builder.toString());
-      writer.write(writable, null);
+    } catch (Throwable t) {
+      t.printStackTrace();
+      exceptionThrown = true;
     }
     writer.close(null);
     verify(jobContextMock, times(1)).getConfiguration();
     verify(jobContextMock, times(1)).getCounter(SqoopCounters.ROWS_WRITTEN);
+    Assert.assertFalse(exceptionThrown, "Exception Thrown during writing");
   }
 
   @Test(expectedExceptions = SqoopException.class)