You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/09/11 15:17:40 UTC

svn commit: r1383400 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/journal/ main/java/org/apache/activemq/store/kahadb/ main/java/org/apache/a...

Author: dejanb
Date: Tue Sep 11 13:17:39 2012
New Revision: 1383400

URL: http://svn.apache.org/viewvc?rev=1383400&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4005 - introducing Lockable interface and LockableServiceSupport for easier dealing with locks

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Lockable.java
      - copied, changed from r1383370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceServiceSupport.java
      - copied, changed from r1383370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java Tue Sep 11 13:17:39 2012
@@ -31,7 +31,7 @@ public abstract class AbstractLocker ext
 
     @Override
     public boolean keepAlive() throws IOException {
-        return false;
+        return true;
     }
 
     @Override

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Lockable.java (from r1383370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Lockable.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Lockable.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java&r1=1383370&r2=1383400&rev=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Lockable.java Tue Sep 11 13:17:39 2012
@@ -16,36 +16,42 @@
  */
 package org.apache.activemq.broker;
 
-import org.apache.activemq.broker.Locker;
-import org.apache.activemq.util.ServiceSupport;
-
 import java.io.IOException;
 
-public abstract class AbstractLocker extends ServiceSupport implements Locker {
+/**
+ * A lockable broker resource. Uses {@link Locker} to guarantee that only single instance is running
+ *
+ */
+public interface Lockable {
 
-    public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 10 * 1000;
+    /**
+     * Turn locking on/off on the resource
+     *
+     * @param useLock
+     */
+    public void setUseLock(boolean useLock);
+
+    /**
+     * Create a default locker
+     *
+     * @return default locker
+     * @throws IOException
+     */
+    public Locker createDefaultLocker() throws IOException;
+
+    /**
+     * Set locker to be used
+     *
+     * @param locker
+     * @throws IOException
+     */
+    public void setLocker(Locker locker) throws IOException;
+
+    /**
+     * Period (in milliseconds) on which {@link org.apache.activemq.broker.Locker#keepAlive()} should be checked
+     *
+     * @param lockKeepAlivePeriod
+     */
+    public void setLockKeepAlivePeriod(long lockKeepAlivePeriod);
 
-    protected String name;
-    protected boolean failIfLocked = false;
-    protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
-
-    @Override
-    public boolean keepAlive() throws IOException {
-        return false;
-    }
-
-    @Override
-    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
-        this.lockAcquireSleepInterval = lockAcquireSleepInterval;
-    }
-
-    @Override
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    @Override
-    public void setFailIfLocked(boolean failIfLocked) {
-        this.failIfLocked = failIfLocked;
-    }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java?rev=1383400&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java Tue Sep 11 13:17:39 2012
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.ThreadPoolUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Helper class for working with services that requires locking
+ */
+public abstract class LockableServiceSupport extends ServiceSupport implements Lockable, BrokerServiceAware {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LockableServiceSupport.class);
+    boolean useLock = true;
+    Locker locker;
+    long lockKeepAlivePeriod = 0;
+    private ScheduledFuture<?> keepAliveTicket;
+    private ScheduledThreadPoolExecutor clockDaemon;
+    private BrokerService brokerService;
+
+    /**
+     * Initialize resources before locking
+     *
+     * @throws Exception
+     */
+    abstract public void init() throws Exception;
+
+    @Override
+    public void setUseLock(boolean useLock) {
+        this.useLock = useLock;
+    }
+
+    @Override
+    public void setLocker(Locker locker) throws IOException {
+        this.locker = locker;
+        if (this instanceof PersistenceAdapter) {
+            this.locker.configure((PersistenceAdapter)this);
+        }
+    }
+
+    public Locker getLocker() throws IOException {
+        if (this.locker == null) {
+            this.locker = createDefaultLocker();
+        }
+        return this.locker;
+    }
+
+    @Override
+    public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
+        this.lockKeepAlivePeriod = lockKeepAlivePeriod;
+    }
+
+    @Override
+    public void preStart() throws Exception {
+        init();
+        if (useLock) {
+            if (getLocker() == null) {
+                LOG.warn("No locker configured");
+            } else {
+                getLocker().start();
+                if (lockKeepAlivePeriod > 0) {
+                    keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
+                        public void run() {
+                            keepLockAlive();
+                        }
+                    }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
+                }
+                if (brokerService != null) {
+                    brokerService.getBroker().nowMasterBroker();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void postStop(ServiceStopper stopper) throws Exception {
+        if (useLock) {
+            if (keepAliveTicket != null) {
+                keepAliveTicket.cancel(false);
+                keepAliveTicket = null;
+            }
+            if (locker != null) {
+                getLocker().stop();
+            }
+            ThreadPoolUtils.shutdown(clockDaemon);
+        }
+    }
+
+    protected void keepLockAlive() {
+        boolean stop = false;
+        try {
+            Locker locker = getLocker();
+            if (locker != null) {
+                if (!locker.keepAlive()) {
+                    stop = true;
+                }
+            }
+        } catch (IOException e) {
+            LOG.warn("locker keepalive resulted in: " + e, e);
+        }
+        if (stop) {
+            stopBroker();
+        }
+    }
+
+    protected void stopBroker() {
+        // we can no longer keep the lock so lets fail
+        LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
+        try {
+            brokerService.stop();
+        } catch (Exception e) {
+            LOG.warn("Failure occurred while stopping broker");
+        }
+    }
+
+    public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
+        if (clockDaemon == null) {
+            clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
+                public Thread newThread(Runnable runnable) {
+                    Thread thread = new Thread(runnable, "ActiveMQ Lock KeepAlive Timer");
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
+        }
+        return clockDaemon;
+    }
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+}

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceServiceSupport.java (from r1383370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceServiceSupport.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceServiceSupport.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java&r1=1383370&r2=1383400&rev=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceServiceSupport.java Tue Sep 11 13:17:39 2012
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import javax.sql.DataSource;
 
+import org.apache.activemq.broker.LockableServiceSupport;
 import org.apache.activemq.util.IOHelper;
 import org.apache.derby.jdbc.EmbeddedDataSource;
 
@@ -30,16 +31,16 @@ import org.apache.derby.jdbc.EmbeddedDat
  * 
  * 
  */
-public class DataSourceSupport {
+abstract public class DataSourceServiceSupport extends LockableServiceSupport {
 
     private String dataDirectory = IOHelper.getDefaultDataDirectory();
     private File dataDirectoryFile;
     private DataSource dataSource;
 
-    public DataSourceSupport() {
+    public DataSourceServiceSupport() {
     }
 
-    public DataSourceSupport(DataSource dataSource) {
+    public DataSourceServiceSupport(DataSource dataSource) {
         this.dataSource = dataSource;
     }
 
@@ -64,7 +65,7 @@ public class DataSourceSupport {
 
     public DataSource getDataSource() throws IOException {
         if (dataSource == null) {
-            dataSource = createDataSource();
+            dataSource = createDataSource(getDataDirectoryFile().getCanonicalPath());
             if (dataSource == null) {
                 throw new IllegalArgumentException("No dataSource property has been configured");
             }
@@ -76,10 +77,10 @@ public class DataSourceSupport {
         this.dataSource = dataSource;
     }
 
-    protected DataSource createDataSource() throws IOException {
+    public static DataSource createDataSource(String homeDir) throws IOException {
 
         // Setup the Derby datasource.
-        System.setProperty("derby.system.home", getDataDirectoryFile().getCanonicalPath());
+        System.setProperty("derby.system.home", homeDir);
         System.setProperty("derby.storage.fileSyncTransactionLog", "true");
         System.setProperty("derby.storage.pageCacheSize", "100");
 
@@ -93,4 +94,6 @@ public class DataSourceSupport {
         return "" + dataSource;
     }
 
+
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Tue Sep 11 13:17:39 2012
@@ -54,6 +54,7 @@ import org.apache.activemq.util.ByteSequ
 import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +71,7 @@ import org.slf4j.LoggerFactory;
  * 
  * 
  */
-public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
+public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter,
     BrokerServiceAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
@@ -79,19 +80,17 @@ public class JDBCPersistenceAdapter exte
     private static FactoryFinder lockFactoryFinder = new FactoryFinder(
                                                                     "META-INF/services/org/apache/activemq/store/jdbc/lock/");
 
+    public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000;
+
     private WireFormat wireFormat = new OpenWireFormat();
     private BrokerService brokerService;
     private Statements statements;
     private JDBCAdapter adapter;
     private MemoryTransactionStore transactionStore;
     private ScheduledThreadPoolExecutor clockDaemon;
-    private ScheduledFuture<?> cleanupTicket, keepAliveTicket;
+    private ScheduledFuture<?> cleanupTicket;
     private int cleanupPeriod = 1000 * 60 * 5;
     private boolean useExternalMessageReferences;
-    private boolean useDatabaseLock = true;
-    private long lockKeepAlivePeriod = 1000*30;
-    private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
-    private Locker locker;
     private boolean createTablesOnStartup = true;
     private DataSource lockDataSource;
     private int transactionIsolation;
@@ -106,6 +105,10 @@ public class JDBCPersistenceAdapter exte
     protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
     protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
 
+    {
+        setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD);
+    }
+
     public JDBCPersistenceAdapter() {
     }
 
@@ -281,8 +284,8 @@ public class JDBCPersistenceAdapter exte
         }
     }
 
-
-    public void start() throws Exception {
+    @Override
+    public void init() throws Exception {
         getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
 
         if (isCreateTablesOnStartup()) {
@@ -299,26 +302,9 @@ public class JDBCPersistenceAdapter exte
                 transactionContext.commit();
             }
         }
+    }
 
-        if (isUseDatabaseLock()) {
-            Locker service = getLocker();
-            if (service == null) {
-                LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
-            } else {
-                service.start();
-                if (lockKeepAlivePeriod > 0) {
-                    keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
-                        public void run() {
-                            databaseLockKeepAlive();
-                        }
-                    }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
-                }
-                if (brokerService != null) {
-                    brokerService.getBroker().nowMasterBroker();
-                }
-            }
-        }
-
+    public void doStart() throws Exception {
         // Cleanup the db periodically.
         if (cleanupPeriod > 0) {
             cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
@@ -331,21 +317,11 @@ public class JDBCPersistenceAdapter exte
         createMessageAudit();
     }
 
-    public synchronized void stop() throws Exception {
+    public synchronized void doStop(ServiceStopper stopper) throws Exception {
         if (cleanupTicket != null) {
             cleanupTicket.cancel(true);
             cleanupTicket = null;
         }
-        if (keepAliveTicket != null) {
-            keepAliveTicket.cancel(false);
-            keepAliveTicket = null;
-        }
-        
-        // do not shutdown clockDaemon as it may kill the thread initiating shutdown
-        Locker service = getDatabaseLocker();
-        if (service != null) {
-            service.stop();
-        }
     }
 
     public void cleanup() {
@@ -403,13 +379,6 @@ public class JDBCPersistenceAdapter exte
         return getLocker();
     }
 
-    public Locker getLocker() throws IOException {
-        if (locker == null && isUseDatabaseLock()) {
-            setLocker(loadDataBaseLocker());
-        }
-        return locker;
-    }
-
     /**
      * Sets the database locker strategy to use to lock the database on startup
      * @throws IOException
@@ -420,16 +389,6 @@ public class JDBCPersistenceAdapter exte
         setLocker(locker);
     }
 
-    /**
-     * Sets the database locker strategy to use to lock the database on startup
-     * @throws IOException 
-     */
-    public void setLocker(Locker locker) throws IOException {
-        this.locker = locker;
-        locker.configure(this);
-        locker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
-    }
-
     public DataSource getLockDataSource() throws IOException {
         if (lockDataSource == null) {
             lockDataSource = getDataSource();
@@ -595,16 +554,15 @@ public class JDBCPersistenceAdapter exte
         this.createTablesOnStartup = createTablesOnStartup;
     }
 
-    public boolean isUseDatabaseLock() {
-        return useDatabaseLock;
-    }
-
     /**
+     * @deprecated use {@link #setUseLock(boolean)} instead
+     *
      * Sets whether or not an exclusive database lock should be used to enable
      * JDBC Master/Slave. Enabled by default.
      */
+    @Deprecated
     public void setUseDatabaseLock(boolean useDatabaseLock) {
-        this.useDatabaseLock = useDatabaseLock;
+        setUseLock(useDatabaseLock);
     }
 
     public static void log(String msg, SQLException e) {
@@ -634,39 +592,13 @@ public class JDBCPersistenceAdapter exte
     public void setUsageManager(SystemUsage usageManager) {
     }
 
-    protected void databaseLockKeepAlive() {
-        boolean stop = false;
-        try {
-            Locker locker = getDatabaseLocker();
-            if (locker != null) {
-                if (!locker.keepAlive()) {
-                    stop = true;
-                }
-            }
-        } catch (IOException e) {
-            LOG.warn("databaselocker keepalive resulted in: " + e, e);
-        }
-        if (stop) {
-            stopBroker();
-        }
-    }
-
-    protected void stopBroker() {
-        // we can no longer keep the lock so lets fail
-        LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
-        try {
-            brokerService.stop();
-        } catch (Exception e) {
-            LOG.warn("Failure occurred while stopping broker");
-        }
-    }
-
-    protected Locker loadDataBaseLocker() throws IOException {
+    public Locker createDefaultLocker() throws IOException {
         DefaultDatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");
         if (locker == null) {
             locker = new DefaultDatabaseLocker();
             LOG.debug("Using default JDBC Locker: " + locker);
         }
+        locker.configure(this);
         return locker;
     }
 
@@ -711,24 +643,15 @@ public class JDBCPersistenceAdapter exte
         return 0;
     }
 
-    public long getLockKeepAlivePeriod() {
-        return lockKeepAlivePeriod;
-    }
-
-    public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
-        this.lockKeepAlivePeriod = lockKeepAlivePeriod;
-    }
-
-    public long getLockAcquireSleepInterval() {
-        return lockAcquireSleepInterval;
-    }
-
     /**
+     * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
+     *
      * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
      * not applied if DataBaseLocker is injected.
+     *
      */
-    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
-        this.lockAcquireSleepInterval = lockAcquireSleepInterval;
+    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException {
+        getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval);
     }
     
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java Tue Sep 11 13:17:39 2012
@@ -22,13 +22,15 @@ import java.io.IOException;
 import org.apache.activeio.journal.Journal;
 import org.apache.activeio.journal.active.JournalImpl;
 import org.apache.activeio.journal.active.JournalLockedException;
+import org.apache.activemq.broker.Locker;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.PersistenceAdapterFactory;
-import org.apache.activemq.store.jdbc.DataSourceSupport;
+import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
 import org.apache.activemq.store.jdbc.JDBCAdapter;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.jdbc.Statements;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.ServiceStopper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +40,7 @@ import org.slf4j.LoggerFactory;
  * @org.apache.xbean.XBean
  * 
  */
-public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
+public class JournalPersistenceAdapterFactory extends DataSourceServiceSupport implements PersistenceAdapterFactory {
 
     private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
 
@@ -185,16 +187,12 @@ public class JournalPersistenceAdapterFa
         jdbcPersistenceAdapter.setStatements(statements);
     }
 
-    public boolean isUseDatabaseLock() {
-        return jdbcPersistenceAdapter.isUseDatabaseLock();
-    }
-
     /**
      * Sets whether or not an exclusive database lock should be used to enable
      * JDBC Master/Slave. Enabled by default.
      */
     public void setUseDatabaseLock(boolean useDatabaseLock) {
-        jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
+        jdbcPersistenceAdapter.setUseLock(useDatabaseLock);
     }
 
     public boolean isCreateTablesOnStartup() {
@@ -245,4 +243,18 @@ public class JournalPersistenceAdapterFa
         }
     }
 
+    @Override
+    public Locker createDefaultLocker() throws IOException {
+        return null;
+    }
+
+    @Override
+    public void init() throws Exception {
+    }
+
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {}
+
+    @Override
+    protected void doStart() throws Exception {}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Tue Sep 11 13:17:39 2012
@@ -17,8 +17,8 @@
 package org.apache.activemq.store.kahadb;
 
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.LockableServiceSupport;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -37,6 +37,7 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ServiceStopper;
 
 import java.io.File;
 import java.io.IOException;
@@ -49,9 +50,8 @@ import java.util.Set;
  * @org.apache.xbean.XBean element="kahaDB"
  *
  */
-public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
+public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter {
     private final KahaDBStore letter = new KahaDBStore();
-    private Locker locker;
 
     /**
      * @param context
@@ -191,8 +191,7 @@ public class KahaDBPersistenceAdapter im
      * @throws Exception
      * @see org.apache.activemq.Service#start()
      */
-    public void start() throws Exception {
-        getLocker().start();
+    public void doStart() throws Exception {
         this.letter.start();
     }
 
@@ -200,12 +199,8 @@ public class KahaDBPersistenceAdapter im
      * @throws Exception
      * @see org.apache.activemq.Service#stop()
      */
-    public void stop() throws Exception {
-        try {
-            this.letter.stop();
-        } finally {
-            getLocker().stop();
-        }
+    public void doStop(ServiceStopper stopper) throws Exception {
+        this.letter.stop();
     }
 
     /**
@@ -486,17 +481,13 @@ public class KahaDBPersistenceAdapter im
     }
 
     /**
-     * @return the databaseLockedWaitDelay
-     */
-    public int getDatabaseLockedWaitDelay() {
-        return letter.getDatabaseLockedWaitDelay();
-    }
-
-    /**
+     * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
+     *
      * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
      */
-    public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
-       letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
+    @Deprecated
+    public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException {
+       getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay);
     }
 
     public boolean getForceRecoverIndex() {
@@ -594,25 +585,16 @@ public class KahaDBPersistenceAdapter im
         return rc;
     }
 
-    public void setLocker(Locker locker) {
-        this.locker = locker;
-    }
-
-    protected Locker getLocker() throws IOException {
-        if (this.locker == null) {
-            this.locker = createDefaultLocker();
-        }
-        return this.locker;
-    }
-
-    protected Locker createDefaultLocker() throws IOException {
+    public Locker createDefaultLocker() throws IOException {
         SharedFileLocker locker = new SharedFileLocker();
         locker.configure(this);
-        locker.setLockAcquireSleepInterval(getDatabaseLockedWaitDelay());
         return locker;
     }
 
     @Override
+    public void init() throws Exception {}
+
+    @Override
     public String toString() {
         String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
         return "KahaDBPersistenceAdapter[" + path + "]";

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Sep 11 13:17:39 2012
@@ -221,7 +221,6 @@ public abstract class MessageDatabase ex
     private int indexCacheSize = 10000;
     private boolean checkForCorruptJournalFiles = false;
     private boolean checksumJournalFiles = false;
-    private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
     protected boolean forceRecoverIndex = false;
     private final Object checkpointThreadLock = new Object();
     private boolean rewriteOnRedelivery = false;
@@ -2308,20 +2307,6 @@ public abstract class MessageDatabase ex
         this.directoryArchive = directoryArchive;
     }
 
-    /**
-     * @return the databaseLockedWaitDelay
-     */
-    public int getDatabaseLockedWaitDelay() {
-        return this.databaseLockedWaitDelay;
-    }
-
-    /**
-     * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
-     */
-    public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
-        this.databaseLockedWaitDelay = databaseLockedWaitDelay;
-    }
-
     public boolean isRewriteOnRedelivery() {
         return rewriteOnRedelivery;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java Tue Sep 11 13:17:39 2012
@@ -51,6 +51,7 @@ public abstract class ServiceSupport imp
             boolean success = false;
             stopped.set(false);
             try {
+                preStart();
                 doStart();
                 success = true;
             } finally {
@@ -70,6 +71,8 @@ public abstract class ServiceSupport imp
                 doStop(stopper);
             } catch (Exception e) {
                 stopper.onException(this, e);
+            } finally {
+                postStop(stopper);
             }
             stopped.set(true);
             started.set(false);
@@ -110,7 +113,23 @@ public abstract class ServiceSupport imp
         this.serviceListeners.remove(l);
     }
 
+    /**
+     *
+     * handle for various operations after stopping the service (like locking)
+     *
+     * @throws Exception
+     */
+    protected void postStop(ServiceStopper stopper) throws Exception {}
+
     protected abstract void doStop(ServiceStopper stopper) throws Exception;
 
+    /**
+     *
+     * handle for various operations before starting the service (like locking)
+     *
+     * @throws Exception
+     */
+    protected void preStart() throws Exception {}
+
     protected abstract void doStart() throws Exception;
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java Tue Sep 11 13:17:39 2012
@@ -17,14 +17,11 @@
 package org.apache.activemq.broker.ft;
 
 import java.io.IOException;
-import java.sql.Connection;
 import java.util.concurrent.TimeUnit;
-import junit.framework.Test;
+
 import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.store.jdbc.DataSourceSupport;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
-import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +31,7 @@ public class DbRestartJDBCQueueMasterSla
     @Override
     protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
         super.configureJdbcPersistenceAdapter(persistenceAdapter);
-        persistenceAdapter.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
+        persistenceAdapter.getLocker().setLockAcquireSleepInterval(getLockAcquireSleepInterval());
         persistenceAdapter.setLockKeepAlivePeriod(getLockKeepAlivePeriod());
         persistenceAdapter.setLocker(new LeaseDatabaseLocker());
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java Tue Sep 11 13:17:39 2012
@@ -28,9 +28,10 @@ import javax.jms.Session;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.jdbc.DataSourceSupport;
+import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOHelper;
 import org.apache.derby.jdbc.EmbeddedDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ public class DbRestartJDBCQueueTest exte
         topic = false;
         verbose = true;
         // startup db
-        sharedDs = (EmbeddedDataSource) new DataSourceSupport().getDataSource();
+        sharedDs = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
 
         broker = new BrokerService();
 
@@ -65,9 +66,9 @@ public class DbRestartJDBCQueueTest exte
         broker.setDeleteAllMessagesOnStartup(true);
         JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
         persistenceAdapter.setDataSource(sharedDs);
-        persistenceAdapter.setUseDatabaseLock(false);
+        persistenceAdapter.setUseLock(false);
         persistenceAdapter.setLockKeepAlivePeriod(500);
-        persistenceAdapter.setLockAcquireSleepInterval(500);
+        persistenceAdapter.getLocker().setLockAcquireSleepInterval(500);
         broker.setPersistenceAdapter(persistenceAdapter);
         broker.start();
         super.setUp();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java Tue Sep 11 13:17:39 2012
@@ -28,8 +28,9 @@ import javax.sql.DataSource;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.store.jdbc.DataSourceSupport;
+import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
 import org.apache.derby.jdbc.EmbeddedDataSource;
 
 public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
@@ -39,7 +40,7 @@ public class JDBCQueueMasterSlaveTest ex
 
     protected void setUp() throws Exception {
         // startup db
-        sharedDs = new SyncDataSource((EmbeddedDataSource)new DataSourceSupport().getDataSource());
+        sharedDs = new SyncDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
         super.setUp();
     }
 
@@ -96,7 +97,7 @@ public class JDBCQueueMasterSlaveTest ex
 
     protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
         persistenceAdapter.setLockKeepAlivePeriod(500);
-        persistenceAdapter.setLockAcquireSleepInterval(500);
+        persistenceAdapter.getLocker().setLockAcquireSleepInterval(500);
     }
 
     protected DataSource getExistingDataSource() throws Exception {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java Tue Sep 11 13:17:39 2012
@@ -347,7 +347,7 @@ public class FailoverStaticNetworkTest {
                         brokerA.setBrokerName("Pair");
                         brokerA.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName="
                                 + JMXSupport.encodeObjectNamePart("A") + "," + "Type=Broker"));
-                        ((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000);
+                        ((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000);
                         brokerA.start();
                         brokerA.waitUntilStopped();
 
@@ -378,7 +378,7 @@ public class FailoverStaticNetworkTest {
                         // so they can coexist in local jmx we set the object name b/c the brokername identifies the shared store
                         brokerA1.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName="
                             + JMXSupport.encodeObjectNamePart("A1") + "," + "Type=Broker"));
-                        ((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000);
+                        ((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000);
                         brokerA1.start();
                         brokerA1.waitUntilStopped();
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java Tue Sep 11 13:17:39 2012
@@ -164,7 +164,7 @@ public class JDBCCommitExceptionTest ext
         dataSource.setCreateDatabase("create");
 
         jdbc.setDataSource(dataSource);
-        jdbc.setUseDatabaseLock(false);
+        jdbc.setUseLock(false);
         jdbc.deleteAllMessages();
 
         broker.setPersistenceAdapter(jdbc);