You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/08/30 12:47:50 UTC

svn commit: r1378881 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apa...

Author: dejanb
Date: Thu Aug 30 10:47:49 2012
New Revision: 1378881

URL: http://svn.apache.org/viewvc?rev=1378881&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4005 - plugable lockers; introduce new API and refactor current KahaDB and JDBC solutions

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Locker.java
      - copied, changed from r1378859, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/SharedFileLocker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/CustomLockerTest.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/shared.xml
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java?rev=1378881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java Thu Aug 30 10:47:49 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.broker.Locker;
+import org.apache.activemq.util.ServiceSupport;
+
+import java.io.IOException;
+
+public abstract class AbstractLocker extends ServiceSupport implements Locker {
+
+    public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 10 * 1000;
+
+    protected String name;
+    protected boolean failIfLocked = false;
+    protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
+
+    @Override
+    public boolean keepAlive() throws IOException {
+        return false;
+    }
+
+    @Override
+    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
+        this.lockAcquireSleepInterval = lockAcquireSleepInterval;
+    }
+
+    @Override
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public void setFailIfLocked(boolean failIfLocked) {
+        this.failIfLocked = failIfLocked;
+    }
+}

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Locker.java (from r1378859, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Locker.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Locker.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java&r1=1378859&r2=1378881&rev=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Locker.java Thu Aug 30 10:47:49 2012
@@ -14,27 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.store.jdbc;
-
-import java.io.IOException;
+package org.apache.activemq.broker;
 
 import org.apache.activemq.Service;
+import org.apache.activemq.store.PersistenceAdapter;
+
+import java.io.IOException;
 
 /**
- * Represents some kind of lock service to ensure that a broker is the only master
- * 
- * 
+ * Represents a lock service to ensure that a broker is the only master
  */
-public interface DatabaseLocker extends Service {
+public interface Locker extends Service {
 
     /**
-     * allow the injection of a jdbc persistence adapter
-     * @param adapter the persistence adapter to use
-     * @throws IOException 
-     */
-    void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException;
-    
-    /**
      * Used by a timer to keep alive the lock.
      * If the method returns false the broker should be terminated
      * if an exception is thrown, the lock state cannot be determined
@@ -43,8 +35,32 @@ public interface DatabaseLocker extends 
 
     /**
      * set the delay interval in milliseconds between lock acquire attempts
+     *
      * @param lockAcquireSleepInterval the sleep interval in miliseconds
      */
     void setLockAcquireSleepInterval(long lockAcquireSleepInterval);
-    
+
+    /**
+     * Set the name of the lock to use.
+     */
+    public void setName(String name);
+
+    /**
+     * Specify whether to fail immediately if the lock is already held.  When set, the CustomLock must throw an
+     * IOException immediately upon detecting the lock is already held.
+     *
+     * @param failIfLocked: true => fail immediately if the lock is held; false => block until the lock can be obtained
+     *                      (default).
+     */
+    public void setFailIfLocked(boolean failIfLocked);
+
+
+    /**
+     * Configure the locker with the persistence adapter currently used
+     *
+     * @param persistenceAdapter
+     * @throws IOException
+     */
+    public void configure(PersistenceAdapter persistenceAdapter) throws IOException;
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java Thu Aug 30 10:47:49 2012
@@ -22,9 +22,10 @@ import org.apache.activemq.Service;
 
 /**
  * Represents some kind of lock service to ensure that a broker is the only master
- * 
- * 
+ *
+ * @deprecated As of 5.7.0, use more general {@link org.apache.activemq.broker.Locker} instead
  */
+@Deprecated
 public interface DatabaseLocker extends Service {
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java Thu Aug 30 10:47:49 2012
@@ -24,7 +24,10 @@ import java.sql.SQLFeatureNotSupportedEx
 
 import javax.sql.DataSource;
 
+import org.apache.activemq.broker.AbstractLocker;
+import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.util.Handler;
+import org.apache.activemq.util.ServiceStopper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,12 +38,11 @@ import org.slf4j.LoggerFactory;
  * @org.apache.xbean.XBean element="database-locker"
  * 
  */
-public class DefaultDatabaseLocker implements DatabaseLocker {
+public class DefaultDatabaseLocker extends AbstractLocker {
     public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 1000;
     private static final Logger LOG = LoggerFactory.getLogger(DefaultDatabaseLocker.class);
     protected DataSource dataSource;
     protected Statements statements;
-    protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
 
     protected PreparedStatement lockCreateStatement;
     protected PreparedStatement lockUpdateStatement;
@@ -48,20 +50,16 @@ public class DefaultDatabaseLocker imple
     protected boolean stopping;
     protected Handler<Exception> exceptionHandler;
     protected int queryTimeout = 10;
-    
-    public DefaultDatabaseLocker() {
-    }
-    
-    public DefaultDatabaseLocker(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
-        setPersistenceAdapter(persistenceAdapter);
-    }
 
-    public void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException {
-        this.dataSource = adapter.getLockDataSource();
-        this.statements = adapter.getStatements();
+    public void configure(PersistenceAdapter adapter) throws IOException {
+        if (adapter instanceof JDBCPersistenceAdapter) {
+            this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
+            this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
+        }
+        lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
     }
     
-    public void start() throws Exception {
+    public void doStart() throws Exception {
         stopping = false;
 
         LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
@@ -134,7 +132,7 @@ public class DefaultDatabaseLocker imple
         LOG.info("Becoming the master on dataSource: " + dataSource);
     }
 
-    public void stop() throws Exception {
+    public void doStop(ServiceStopper stopper) throws Exception {
         stopping = true;
         try {
             if (lockCreateStatement != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java Thu Aug 30 10:47:49 2012
@@ -17,6 +17,8 @@
 package org.apache.activemq.store.jdbc;
 
 import java.io.IOException;
+
+import org.apache.activemq.broker.Locker;
 import org.apache.activemq.util.DefaultIOExceptionHandler;
 
 /**
@@ -34,7 +36,7 @@ public class JDBCIOExceptionHandler exte
         boolean hasLock = true;
         if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
             JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) broker.getPersistenceAdapter();
-            DatabaseLocker locker = jdbcPersistenceAdapter.getDatabaseLocker();
+            Locker locker = jdbcPersistenceAdapter.getDatabaseLocker();
             if (locker != null) {
                 try {
                     if (!locker.keepAlive()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Aug 30 10:47:49 2012
@@ -41,6 +41,7 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.broker.Locker;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
@@ -89,7 +90,7 @@ public class JDBCPersistenceAdapter exte
     private boolean useDatabaseLock = true;
     private long lockKeepAlivePeriod = 1000*30;
     private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
-    private DatabaseLocker databaseLocker;
+    private Locker locker;
     private boolean createTablesOnStartup = true;
     private DataSource lockDataSource;
     private int transactionIsolation;
@@ -299,7 +300,7 @@ public class JDBCPersistenceAdapter exte
         }
 
         if (isUseDatabaseLock()) {
-            DatabaseLocker service = getDatabaseLocker();
+            Locker service = getDatabaseLocker();
             if (service == null) {
                 LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
             } else {
@@ -340,7 +341,7 @@ public class JDBCPersistenceAdapter exte
         }
         
         // do not shutdown clockDaemon as it may kill the thread initiating shutdown
-        DatabaseLocker service = getDatabaseLocker();
+        Locker service = getDatabaseLocker();
         if (service != null) {
             service.stop();
         }
@@ -392,21 +393,40 @@ public class JDBCPersistenceAdapter exte
         return adapter;
     }
 
-    public DatabaseLocker getDatabaseLocker() throws IOException {
-        if (databaseLocker == null && isUseDatabaseLock()) {
-            setDatabaseLocker(loadDataBaseLocker());
+    /**
+     *
+     * @deprecated as of 5.7.0, replaced by {@link #getLocker()}
+     */
+    @Deprecated
+    public Locker getDatabaseLocker() throws IOException {
+        return getLocker();
+    }
+
+    public Locker getLocker() throws IOException {
+        if (locker == null && isUseDatabaseLock()) {
+            setLocker(loadDataBaseLocker());
         }
-        return databaseLocker;
+        return locker;
+    }
+
+    /**
+     * Sets the database locker strategy to use to lock the database on startup
+     * @throws IOException
+     *
+     * @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)}
+     */
+    public void setDatabaseLocker(Locker locker) throws IOException {
+        setLocker(locker);
     }
 
     /**
      * Sets the database locker strategy to use to lock the database on startup
      * @throws IOException 
      */
-    public void setDatabaseLocker(DatabaseLocker locker) throws IOException {
-        databaseLocker = locker;
-        databaseLocker.setPersistenceAdapter(this);
-        databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
+    public void setLocker(Locker locker) throws IOException {
+        this.locker = locker;
+        locker.configure(this);
+        locker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
     }
 
     public DataSource getLockDataSource() throws IOException {
@@ -616,7 +636,7 @@ public class JDBCPersistenceAdapter exte
     protected void databaseLockKeepAlive() {
         boolean stop = false;
         try {
-            DatabaseLocker locker = getDatabaseLocker();
+            Locker locker = getDatabaseLocker();
             if (locker != null) {
                 if (!locker.keepAlive()) {
                     stop = true;
@@ -640,8 +660,8 @@ public class JDBCPersistenceAdapter exte
         }
     }
 
-    protected DatabaseLocker loadDataBaseLocker() throws IOException {
-        DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");       
+    protected Locker loadDataBaseLocker() throws IOException {
+        DefaultDatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");
         if (locker == null) {
             locker = new DefaultDatabaseLocker();
             LOG.debug("Using default JDBC Locker: " + locker);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java Thu Aug 30 10:47:49 2012
@@ -25,7 +25,11 @@ import java.sql.Timestamp;
 import java.util.Date;
 import java.util.concurrent.TimeUnit;
 import javax.sql.DataSource;
+
+import org.apache.activemq.broker.AbstractLocker;
+import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,12 +40,11 @@ import org.slf4j.LoggerFactory;
  * @org.apache.xbean.XBean element="lease-database-locker"
  * 
  */
-public class LeaseDatabaseLocker implements DatabaseLocker {
+public class LeaseDatabaseLocker extends AbstractLocker {
     private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
     public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 5000;
     protected DataSource dataSource;
     protected Statements statements;
-    protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
 
     protected boolean stopping;
     protected int maxAllowableDiffFromDBTime = 0;
@@ -51,13 +54,16 @@ public class LeaseDatabaseLocker impleme
     JDBCPersistenceAdapter persistenceAdapter;
 
 
-    public void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException {
-        this.dataSource = adapter.getLockDataSource();
-        this.statements = adapter.getStatements();
-        this.persistenceAdapter = adapter;
+    public void configure(PersistenceAdapter adapter) throws IOException {
+        if (adapter instanceof JDBCPersistenceAdapter) {
+            this.persistenceAdapter = (JDBCPersistenceAdapter)adapter;
+            this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
+            this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
+        }
+        lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
     }
     
-    public void start() throws Exception {
+    public void doStart() throws Exception {
         stopping = false;
 
         LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the Master broker");
@@ -176,7 +182,7 @@ public class LeaseDatabaseLocker impleme
         return result;
     }
 
-    public void stop() throws Exception {
+    public void doStop(ServiceStopper stopper) throws Exception {
         releaseLease();
         stopping = true;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java Thu Aug 30 10:47:49 2012
@@ -36,15 +36,8 @@ import org.slf4j.LoggerFactory;
 public class TransactDatabaseLocker extends DefaultDatabaseLocker {
     private static final Logger LOG = LoggerFactory.getLogger(TransactDatabaseLocker.class);
     
-    public TransactDatabaseLocker() {
-    }
-    
-    public TransactDatabaseLocker(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
-        setPersistenceAdapter(persistenceAdapter);
-    }
-    
     @Override
-    public void start() throws Exception {
+    public void doStart() throws Exception {
         stopping = false;
 
         LOG.info("Attempting to acquire the exclusive lock to become the Master broker");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Thu Aug 30 10:47:49 2012
@@ -27,6 +27,7 @@ import org.apache.activemq.command.Produ
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.broker.Locker;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
@@ -49,6 +50,7 @@ import java.util.Set;
  */
 public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
     private final KahaDBStore letter = new KahaDBStore();
+    private Locker locker;
 
     /**
      * @param context
@@ -189,6 +191,7 @@ public class KahaDBPersistenceAdapter im
      * @see org.apache.activemq.Service#start()
      */
     public void start() throws Exception {
+        getLocker().start();
         this.letter.start();
     }
 
@@ -197,7 +200,11 @@ public class KahaDBPersistenceAdapter im
      * @see org.apache.activemq.Service#stop()
      */
     public void stop() throws Exception {
-        this.letter.stop();
+        try {
+            this.letter.stop();
+        } finally {
+            getLocker().stop();
+        }
     }
 
     /**
@@ -586,6 +593,24 @@ public class KahaDBPersistenceAdapter im
         return rc;
     }
 
+    public void setLocker(Locker locker) {
+        this.locker = locker;
+    }
+
+    protected Locker getLocker() throws IOException {
+        if (this.locker == null) {
+            this.locker = createDefaultLocker();
+        }
+        return this.locker;
+    }
+
+    protected Locker createDefaultLocker() throws IOException {
+        SharedFileLocker locker = new SharedFileLocker();
+        locker.configure(this);
+        locker.setLockAcquireSleepInterval(getDatabaseLockedWaitDelay());
+        return locker;
+    }
+
     @Override
     public String toString() {
         String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu Aug 30 10:47:49 2012
@@ -368,49 +368,10 @@ public abstract class MessageDatabase ex
         }
     }
 
-    private void lock() throws IOException {
-
-        if (lockFile == null) {
-            File lockFileName = new File(directory, "lock");
-            lockFile = new LockFile(lockFileName, true);
-            if (failIfDatabaseIsLocked) {
-                lockFile.lock();
-            } else {
-                boolean locked = false;
-                while ((!isStopped()) && (!isStopping())) {
-                    try {
-                        lockFile.lock();
-                        locked = true;
-                        break;
-                    } catch (IOException e) {
-                        LOG.info("Database "
-                                + lockFileName
-                                + " is locked... waiting "
-                                + (getDatabaseLockedWaitDelay() / 1000)
-                                + " seconds for the database to be unlocked. Reason: "
-                                + e);
-                        try {
-                            Thread.sleep(getDatabaseLockedWaitDelay());
-                        } catch (InterruptedException e1) {
-                        }
-                    }
-                }
-                if (!locked) {
-                    throw new IOException("attempt to obtain lock aborted due to shutdown");
-                }
-            }
-        }
-    }
-
-    // for testing
-    public LockFile getLockFile() {
-        return lockFile;
-    }
-
     public void load() throws IOException {
         this.indexLock.writeLock().lock();
+        IOHelper.mkdirs(directory);
         try {
-            lock();
             if (deleteAllMessages) {
                 getJournal().start();
                 getJournal().delete();
@@ -430,30 +391,25 @@ public abstract class MessageDatabase ex
 
     public void close() throws IOException, InterruptedException {
         if( opened.compareAndSet(true, false)) {
+            this.indexLock.writeLock().lock();
             try {
-                this.indexLock.writeLock().lock();
-                try {
-                    if (metadata.page != null) {
-                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                            public void execute(Transaction tx) throws IOException {
-                                checkpointUpdate(tx, true);
-                            }
-                        });
-                    }
-                    pageFile.unload();
-                    metadata = new Metadata();
-                } finally {
-                    this.indexLock.writeLock().unlock();
-                }
-                journal.close();
-                synchronized (checkpointThreadLock) {
-                    if (checkpointThread != null) {
-                        checkpointThread.join();
-                    }
+                if (metadata.page != null) {
+                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                        public void execute(Transaction tx) throws IOException {
+                            checkpointUpdate(tx, true);
+                        }
+                    });
                 }
+                pageFile.unload();
+                metadata = new Metadata();
             } finally {
-                lockFile.unlock();
-                lockFile=null;
+                this.indexLock.writeLock().unlock();
+            }
+            journal.close();
+            synchronized (checkpointThreadLock) {
+                if (checkpointThread != null) {
+                    checkpointThread.join();
+                }
             }
         }
     }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/SharedFileLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/SharedFileLocker.java?rev=1378881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/SharedFileLocker.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/SharedFileLocker.java Thu Aug 30 10:47:49 2012
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.broker.AbstractLocker;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.kahadb.util.LockFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Represents an exclusive lock on a database to avoid multiple brokers running
+ * against the same logical database.
+ *
+ * @org.apache.xbean.XBean element="shared-file-locker"
+ *
+ */
+public class SharedFileLocker extends AbstractLocker {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SharedFileLocker.class);
+
+    private LockFile lockFile;
+    protected File directory = MessageDatabase.DEFAULT_DIRECTORY;
+
+    @Override
+    public void doStart() throws Exception {
+        if (lockFile == null) {
+            File lockFileName = new File(directory, "lock");
+            lockFile = new LockFile(lockFileName, true);
+            if (failIfLocked) {
+                lockFile.lock();
+            } else {
+                boolean locked = false;
+                while ((!isStopped()) && (!isStopping())) {
+                    try {
+                        lockFile.lock();
+                        locked = true;
+                        break;
+                    } catch (IOException e) {
+                        LOG.info("Database "
+                                + lockFileName
+                                + " is locked... waiting "
+                                + (lockAcquireSleepInterval / 1000)
+                                + " seconds for the database to be unlocked. Reason: "
+                                + e);
+                        try {
+                            Thread.sleep(lockAcquireSleepInterval);
+                        } catch (InterruptedException e1) {
+                        }
+                    }
+                }
+                if (!locked) {
+                    throw new IOException("attempt to obtain lock aborted due to shutdown");
+                }
+            }
+        }
+    }
+
+    @Override
+    public void doStop(ServiceStopper stopper) throws Exception {
+        lockFile.unlock();
+        lockFile=null;
+    }
+
+    public File getDirectory() {
+        return directory;
+    }
+
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+
+    @Override
+    public void configure(PersistenceAdapter persistenceAdapter) throws IOException {
+        this.setDirectory(persistenceAdapter.getDirectory());
+    }
+}

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java Thu Aug 30 10:47:49 2012
@@ -55,11 +55,11 @@ public class LeaseDatabaseLockerTest {
 
         LeaseDatabaseLocker lockerA = new LeaseDatabaseLocker();
         brokerService.setBrokerName("First");
-        lockerA.setPersistenceAdapter(jdbc);
+        lockerA.configure(jdbc);
 
         final LeaseDatabaseLocker lockerB = new LeaseDatabaseLocker();
         brokerService.setBrokerName("Second");
-        lockerB.setPersistenceAdapter(jdbc);
+        lockerB.configure(jdbc);
         final AtomicBoolean blocked = new AtomicBoolean(true);
 
         final Connection connection = dataSource.getConnection();

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/CustomLockerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/CustomLockerTest.java?rev=1378881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/CustomLockerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/CustomLockerTest.java Thu Aug 30 10:47:49 2012
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import junit.framework.TestCase;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+
+public class CustomLockerTest extends TestCase {
+
+    public void testCustomLocker() throws Exception {
+        BrokerService broker = BrokerFactory.createBroker("xbean:org/apache/activemq/store/kahadb/shared.xml");
+        broker.waitUntilStarted();
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+}

Added: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/shared.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/shared.xml?rev=1378881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/shared.xml (added)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/shared.xml Thu Aug 30 10:47:49 2012
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+        xmlns="http://www.springframework.org/schema/beans"
+        xmlns:amq="http://activemq.apache.org/schema/core"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+    <!-- normal ActiveMQ XML config which is less verbose & can be validated -->
+    <amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
+                useLoggingForShutdownErrors="true" useJmx="true"
+                persistent="true" vmConnectorURI="vm://javacoola"
+                useShutdownHook="false" deleteAllMessagesOnStartup="true">
+
+        <amq:persistenceAdapter>
+            <amq:kahaDB directory = "target/activemq-data">
+                <amq:locker>
+                    <amq:shared-file-locker lockAcquireSleepInterval="5000"/>
+                </amq:locker>
+            </amq:kahaDB>
+        </amq:persistenceAdapter>
+
+        <amq:systemUsage>
+            <amq:systemUsage>
+                <amq:memoryUsage>
+                    <amq:memoryUsage limit="10 mb" percentUsageMinDelta="20"/>
+                </amq:memoryUsage>
+                <amq:storeUsage>
+                    <amq:storeUsage limit="1 gb" name="foo"/>
+                </amq:storeUsage>
+                <amq:tempUsage>
+                    <amq:tempUsage limit="100 mb"/>
+                </amq:tempUsage>
+            </amq:systemUsage>
+        </amq:systemUsage>
+
+        <amq:transportConnectors>
+            <amq:transportConnector uri="tcp://localhost:61635"/>
+        </amq:transportConnectors>
+
+    </amq:broker>
+
+</beans>
\ No newline at end of file