You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/15 00:50:39 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-4969

Updated Branches:
  refs/heads/trunk 08f21ed71 -> 11ae61f53


https://issues.apache.org/jira/browse/AMQ-4969

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/11ae61f5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/11ae61f5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/11ae61f5

Branch: refs/heads/trunk
Commit: 11ae61f5398445c05da2b5dc0c32d2633921335e
Parents: 08f21ed
Author: Timothy Bish <ta...@gmai.com>
Authored: Tue Jan 14 12:23:52 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Tue Jan 14 18:48:40 2014 -0500

----------------------------------------------------------------------
 .../kahadb/scheduler/JobSchedulerStoreImpl.java | 48 ++++++--------------
 1 file changed, 15 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/11ae61f5/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 40a027d..5934914 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -30,8 +30,11 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.broker.LockableServiceSupport;
+import org.apache.activemq.broker.Locker;
 import org.apache.activemq.broker.scheduler.JobScheduler;
 import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+import org.apache.activemq.store.SharedFileLocker;
 import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
@@ -43,13 +46,11 @@ import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
 import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.LockFile;
 import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore {
+public class JobSchedulerStoreImpl extends LockableServiceSupport implements JobSchedulerStore {
     static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
     private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 
@@ -59,7 +60,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
     private File directory;
     PageFile pageFile;
     private Journal journal;
-    LockFile lockFile;
     protected AtomicLong journalSize = new AtomicLong(0);
     private boolean failIfDatabaseIsLocked;
     private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
@@ -248,7 +248,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
             this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
         }
         IOHelper.mkdirs(this.directory);
-        lock();
         this.journal = new Journal();
         this.journal.setDirectory(directory);
         this.journal.setMaxFileLength(getJournalMaxFileLength());
@@ -302,10 +301,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
         if (this.journal != null) {
             journal.close();
         }
-        if (this.lockFile != null) {
-            this.lockFile.unlock();
-        }
-        this.lockFile = null;
         LOG.info(this + " stopped");
     }
 
@@ -340,30 +335,6 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
         return this.journal.write(payload, sync);
     }
 
-    private void lock() throws IOException {
-        if (lockFile == null) {
-            File lockFileName = new File(directory, "lock");
-            lockFile = new LockFile(lockFileName, true);
-            if (failIfDatabaseIsLocked) {
-                lockFile.lock();
-            } else {
-                while (true) {
-                    try {
-                        lockFile.lock();
-                        break;
-                    } catch (IOException e) {
-                        LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000)
-                            + " seconds for the database to be unlocked. Reason: " + e);
-                        try {
-                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
-                        } catch (InterruptedException e1) {
-                        }
-                    }
-                }
-            }
-        }
-    }
-
     PageFile getPageFile() {
         this.pageFile.isLoaded();
         return this.pageFile;
@@ -405,4 +376,15 @@ public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedule
     public String toString() {
         return "JobSchedulerStore:" + this.directory;
     }
+
+    @Override
+    public Locker createDefaultLocker() throws IOException {
+        SharedFileLocker locker = new SharedFileLocker();
+        locker.setDirectory(this.getDirectory());
+        return locker;
+    }
+
+    @Override
+    public void init() throws Exception {
+    }
 }