You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/15 00:50:39 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-4969
Updated Branches:
refs/heads/trunk 08f21ed71 -> 11ae61f53
https://issues.apache.org/jira/browse/AMQ-4969
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/11ae61f5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/11ae61f5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/11ae61f5
Branch: refs/heads/trunk
Commit: 11ae61f5398445c05da2b5dc0c32d2633921335e
Parents: 08f21ed
Author: Timothy Bish <ta...@gmai.com>
Authored: Tue Jan 14 12:23:52 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Tue Jan 14 18:48:40 2014 -0500
----------------------------------------------------------------------
.../kahadb/scheduler/JobSchedulerStoreImpl.java | 48 ++++++--------------
1 file changed, 15 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/11ae61f5/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 40a027d..5934914 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -30,8 +30,11 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.broker.LockableServiceSupport;
+import org.apache.activemq.broker.Locker;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+import org.apache.activemq.store.SharedFileLocker;
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
@@ -43,13 +46,11 @@ import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.LockFile;
import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore {
+public class JobSchedulerStoreImpl extends LockableServiceSupport implements JobSchedulerStore {
static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -59,7 +60,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
private File directory;
PageFile pageFile;
private Journal journal;
- LockFile lockFile;
protected AtomicLong journalSize = new AtomicLong(0);
private boolean failIfDatabaseIsLocked;
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
@@ -248,7 +248,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
}
IOHelper.mkdirs(this.directory);
- lock();
this.journal = new Journal();
this.journal.setDirectory(directory);
this.journal.setMaxFileLength(getJournalMaxFileLength());
@@ -302,10 +301,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
if (this.journal != null) {
journal.close();
}
- if (this.lockFile != null) {
- this.lockFile.unlock();
- }
- this.lockFile = null;
LOG.info(this + " stopped");
}
@@ -340,30 +335,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
return this.journal.write(payload, sync);
}
- private void lock() throws IOException {
- if (lockFile == null) {
- File lockFileName = new File(directory, "lock");
- lockFile = new LockFile(lockFileName, true);
- if (failIfDatabaseIsLocked) {
- lockFile.lock();
- } else {
- while (true) {
- try {
- lockFile.lock();
- break;
- } catch (IOException e) {
- LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000)
- + " seconds for the database to be unlocked. Reason: " + e);
- try {
- Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
- } catch (InterruptedException e1) {
- }
- }
- }
- }
- }
- }
-
PageFile getPageFile() {
this.pageFile.isLoaded();
return this.pageFile;
@@ -405,4 +376,15 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
public String toString() {
return "JobSchedulerStore:" + this.directory;
}
+
+ @Override
+ public Locker createDefaultLocker() throws IOException {
+ SharedFileLocker locker = new SharedFileLocker();
+ locker.setDirectory(this.getDirectory());
+ return locker;
+ }
+
+ @Override
+ public void init() throws Exception {
+ }
}