You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2015/11/18 17:07:05 UTC
sqoop git commit: SQOOP-2396: Sqoop2: Race condition in purge/update
threads on Server shutdown
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 4f0e28625 -> 190b78fcb
SQOOP-2396: Sqoop2: Race condition in purge/update threads on Server shutdown
(Dian Fu via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/190b78fc
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/190b78fc
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/190b78fc
Branch: refs/heads/sqoop2
Commit: 190b78fcb83c4870d7dcb7f8af289047f44fefb8
Parents: 4f0e286
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Nov 18 08:06:49 2015 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Nov 18 08:06:49 2015 -0800
----------------------------------------------------------------------
.../org/apache/sqoop/driver/JobManager.java | 65 +++++++++++++-------
1 file changed, 43 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/190b78fc/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index 15ca796..90ee541 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -140,6 +140,16 @@ public class JobManager implements Reconfigurable {
private UpdateThread updateThread = null;
/**
+ * Lock for purge thread.
+ */
+ private Object purgeThreadLock = new Object();
+
+ /**
+ * Lock for update thread.
+ */
+ private Object updateThreadLock = new Object();
+
+ /**
* Synchronization variable between threads.
*/
private boolean running;
@@ -196,20 +206,24 @@ public class JobManager implements Reconfigurable {
running = false;
- try {
- purgeThread.interrupt();
- purgeThread.join();
- } catch (InterruptedException e) {
- // TODO(jarcec): Do I want to wait until it actually finish here?
- LOG.error("Interrupted joining purgeThread");
+ synchronized(purgeThreadLock) {
+ try {
+ purgeThread.interrupt();
+ purgeThread.join();
+ } catch (InterruptedException e) {
+ // TODO(jarcec): Do I want to wait until it actually finish here?
+ LOG.error("Interrupted joining purgeThread");
+ }
}
- try {
- updateThread.interrupt();
- updateThread.join();
- } catch (InterruptedException e) {
- // TODO(jarcec): Do I want to wait until it actually finish here?
- LOG.error("Interrupted joining updateThread");
+ synchronized(updateThreadLock) {
+ try {
+ updateThread.interrupt();
+ updateThread.join();
+ } catch (InterruptedException e) {
+ // TODO(jarcec): Do I want to wait until it actually finish here?
+ LOG.error("Interrupted joining updateThread");
+ }
}
if (submissionEngine != null) {
@@ -763,11 +777,15 @@ public class JobManager implements Reconfigurable {
try {
LOG.info("Purging old submissions");
Date threshold = new Date((new Date()).getTime() - purgeThreshold);
- RepositoryManager.getInstance().getRepository()
- .purgeSubmissions(threshold);
+ synchronized(purgeThreadLock) {
+ RepositoryManager.getInstance().getRepository()
+ .purgeSubmissions(threshold);
+ }
Thread.sleep(purgeSleep);
} catch (InterruptedException e) {
LOG.debug("Purge thread interrupted", e);
+ } catch (SqoopException ex) {
+ LOG.error("Purge thread encountered exception", ex);
}
}
@@ -787,18 +805,21 @@ public class JobManager implements Reconfigurable {
try {
LOG.debug("Updating running submissions");
- // Let's get all running submissions from repository to check them out
- List<MSubmission> unfinishedSubmissions =
- RepositoryManager.getInstance().getRepository()
- .findUnfinishedSubmissions();
+ synchronized(updateThreadLock) {
+ // Let's get all running submissions from repository to check them out
+ List<MSubmission> unfinishedSubmissions =
+ RepositoryManager.getInstance().getRepository()
+ .findUnfinishedSubmissions();
- for (MSubmission submission : unfinishedSubmissions) {
- updateSubmission(submission);
+ for (MSubmission submission : unfinishedSubmissions) {
+ updateSubmission(submission);
+ }
}
-
Thread.sleep(updateSleep);
} catch (InterruptedException e) {
- LOG.debug("Purge thread interrupted", e);
+ LOG.debug("Update thread interrupted", e);
+ } catch (SqoopException ex) {
+ LOG.error("Update thread encountered exception", ex);
}
}