You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/11/18 17:07:05 UTC

sqoop git commit: SQOOP-2396: Sqoop2: Race condition in purge/update threads on Server shutdown

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 4f0e28625 -> 190b78fcb


SQOOP-2396: Sqoop2: Race condition in purge/update threads on Server shutdown

(Dian Fu via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/190b78fc
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/190b78fc
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/190b78fc

Branch: refs/heads/sqoop2
Commit: 190b78fcb83c4870d7dcb7f8af289047f44fefb8
Parents: 4f0e286
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Nov 18 08:06:49 2015 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Nov 18 08:06:49 2015 -0800

----------------------------------------------------------------------
 .../org/apache/sqoop/driver/JobManager.java     | 65 +++++++++++++-------
 1 file changed, 43 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/190b78fc/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index 15ca796..90ee541 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -140,6 +140,16 @@ public class JobManager implements Reconfigurable {
   private UpdateThread updateThread = null;
 
   /**
+   * Lock for purge thread.
+   */
+  private Object purgeThreadLock = new Object();
+
+  /**
+   * Lock for update thread.
+   */
+  private Object updateThreadLock = new Object();
+
+  /**
    * Synchronization variable between threads.
    */
   private boolean running;
@@ -196,20 +206,24 @@ public class JobManager implements Reconfigurable {
 
     running = false;
 
-    try {
-      purgeThread.interrupt();
-      purgeThread.join();
-    } catch (InterruptedException e) {
-      // TODO(jarcec): Do I want to wait until it actually finish here?
-      LOG.error("Interrupted joining purgeThread");
+    synchronized(purgeThreadLock) {
+      try {
+        purgeThread.interrupt();
+        purgeThread.join();
+      } catch (InterruptedException e) {
+        // TODO(jarcec): Do I want to wait until it actually finish here?
+        LOG.error("Interrupted joining purgeThread");
+      }
     }
 
-    try {
-      updateThread.interrupt();
-      updateThread.join();
-    } catch (InterruptedException e) {
-      // TODO(jarcec): Do I want to wait until it actually finish here?
-      LOG.error("Interrupted joining updateThread");
+    synchronized(updateThreadLock) {
+      try {
+        updateThread.interrupt();
+        updateThread.join();
+      } catch (InterruptedException e) {
+        // TODO(jarcec): Do I want to wait until it actually finish here?
+        LOG.error("Interrupted joining updateThread");
+      }
     }
 
     if (submissionEngine != null) {
@@ -763,11 +777,15 @@ public class JobManager implements Reconfigurable {
         try {
           LOG.info("Purging old submissions");
           Date threshold = new Date((new Date()).getTime() - purgeThreshold);
-          RepositoryManager.getInstance().getRepository()
-            .purgeSubmissions(threshold);
+          synchronized(purgeThreadLock) {
+            RepositoryManager.getInstance().getRepository()
+              .purgeSubmissions(threshold);
+          }
           Thread.sleep(purgeSleep);
         } catch (InterruptedException e) {
           LOG.debug("Purge thread interrupted", e);
+        } catch (SqoopException ex) {
+          LOG.error("Purge thread encountered exception", ex);
         }
       }
 
@@ -787,18 +805,21 @@ public class JobManager implements Reconfigurable {
         try {
           LOG.debug("Updating running submissions");
 
-          // Let's get all running submissions from repository to check them out
-          List<MSubmission> unfinishedSubmissions =
-            RepositoryManager.getInstance().getRepository()
-              .findUnfinishedSubmissions();
+          synchronized(updateThreadLock) {
+            // Let's get all running submissions from repository to check them out
+            List<MSubmission> unfinishedSubmissions =
+              RepositoryManager.getInstance().getRepository()
+                .findUnfinishedSubmissions();
 
-          for (MSubmission submission : unfinishedSubmissions) {
-            updateSubmission(submission);
+            for (MSubmission submission : unfinishedSubmissions) {
+              updateSubmission(submission);
+            }
           }
-
           Thread.sleep(updateSleep);
         } catch (InterruptedException e) {
-          LOG.debug("Purge thread interrupted", e);
+          LOG.debug("Update thread interrupted", e);
+        } catch (SqoopException ex) {
+          LOG.error("Update thread encountered exception", ex);
         }
       }