You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/08/30 12:47:50 UTC
svn commit: r1378881 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/store/jdbc/
main/java/org/apache/activemq/store/jdbc/adapter/
main/java/org/apache/activemq/store/kahadb/ test/java/org/apa...
Author: dejanb
Date: Thu Aug 30 10:47:49 2012
New Revision: 1378881
URL: http://svn.apache.org/viewvc?rev=1378881&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4005 - plugable lockers; introduce new API and refactor current KahaDB and JDBC solutions
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Locker.java
- copied, changed from r1378859, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/SharedFileLocker.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/CustomLockerTest.java
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/shared.xml
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java?rev=1378881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java Thu Aug 30 10:47:49 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.broker.Locker;
+import org.apache.activemq.util.ServiceSupport;
+
+import java.io.IOException;
+
+public abstract class AbstractLocker extends ServiceSupport implements Locker {
+
+ public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 10 * 1000;
+
+ protected String name;
+ protected boolean failIfLocked = false;
+ protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
+
+ @Override
+ public boolean keepAlive() throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
+ this.lockAcquireSleepInterval = lockAcquireSleepInterval;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void setFailIfLocked(boolean failIfLocked) {
+ this.failIfLocked = failIfLocked;
+ }
+}
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Locker.java (from r1378859, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Locker.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Locker.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java&r1=1378859&r2=1378881&rev=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Locker.java Thu Aug 30 10:47:49 2012
@@ -14,27 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.store.jdbc;
-
-import java.io.IOException;
+package org.apache.activemq.broker;
import org.apache.activemq.Service;
+import org.apache.activemq.store.PersistenceAdapter;
+
+import java.io.IOException;
/**
- * Represents some kind of lock service to ensure that a broker is the only master
- *
- *
+ * Represents a lock service to ensure that a broker is the only master
*/
-public interface DatabaseLocker extends Service {
+public interface Locker extends Service {
/**
- * allow the injection of a jdbc persistence adapter
- * @param adapter the persistence adapter to use
- * @throws IOException
- */
- void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException;
-
- /**
* Used by a timer to keep alive the lock.
* If the method returns false the broker should be terminated
* if an exception is thrown, the lock state cannot be determined
@@ -43,8 +35,32 @@ public interface DatabaseLocker extends
/**
* set the delay interval in milliseconds between lock acquire attempts
+ *
* @param lockAcquireSleepInterval the sleep interval in miliseconds
*/
void setLockAcquireSleepInterval(long lockAcquireSleepInterval);
-
+
+ /**
+ * Set the name of the lock to use.
+ */
+ public void setName(String name);
+
+ /**
+ * Specify whether to fail immediately if the lock is already held. When set, the CustomLock must throw an
+ * IOException immediately upon detecting the lock is already held.
+ *
+ * @param failIfLocked: true => fail immediately if the lock is held; false => block until the lock can be obtained
+ * (default).
+ */
+ public void setFailIfLocked(boolean failIfLocked);
+
+
+ /**
+ * Configure the locker with the persistence adapter currently used
+ *
+ * @param persistenceAdapter
+ * @throws IOException
+ */
+ public void configure(PersistenceAdapter persistenceAdapter) throws IOException;
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java Thu Aug 30 10:47:49 2012
@@ -22,9 +22,10 @@ import org.apache.activemq.Service;
/**
* Represents some kind of lock service to ensure that a broker is the only master
- *
- *
+ *
+ * @deprecated As of 5.7.0, use more general {@link org.apache.activemq.broker.Locker} instead
*/
+@Deprecated
public interface DatabaseLocker extends Service {
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java Thu Aug 30 10:47:49 2012
@@ -24,7 +24,10 @@ import java.sql.SQLFeatureNotSupportedEx
import javax.sql.DataSource;
+import org.apache.activemq.broker.AbstractLocker;
+import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.Handler;
+import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,12 +38,11 @@ import org.slf4j.LoggerFactory;
* @org.apache.xbean.XBean element="database-locker"
*
*/
-public class DefaultDatabaseLocker implements DatabaseLocker {
+public class DefaultDatabaseLocker extends AbstractLocker {
public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 1000;
private static final Logger LOG = LoggerFactory.getLogger(DefaultDatabaseLocker.class);
protected DataSource dataSource;
protected Statements statements;
- protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
protected PreparedStatement lockCreateStatement;
protected PreparedStatement lockUpdateStatement;
@@ -48,20 +50,16 @@ public class DefaultDatabaseLocker imple
protected boolean stopping;
protected Handler<Exception> exceptionHandler;
protected int queryTimeout = 10;
-
- public DefaultDatabaseLocker() {
- }
-
- public DefaultDatabaseLocker(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
- setPersistenceAdapter(persistenceAdapter);
- }
- public void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException {
- this.dataSource = adapter.getLockDataSource();
- this.statements = adapter.getStatements();
+ public void configure(PersistenceAdapter adapter) throws IOException {
+ if (adapter instanceof JDBCPersistenceAdapter) {
+ this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
+ this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
+ }
+ lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
}
- public void start() throws Exception {
+ public void doStart() throws Exception {
stopping = false;
LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
@@ -134,7 +132,7 @@ public class DefaultDatabaseLocker imple
LOG.info("Becoming the master on dataSource: " + dataSource);
}
- public void stop() throws Exception {
+ public void doStop(ServiceStopper stopper) throws Exception {
stopping = true;
try {
if (lockCreateStatement != null) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java Thu Aug 30 10:47:49 2012
@@ -17,6 +17,8 @@
package org.apache.activemq.store.jdbc;
import java.io.IOException;
+
+import org.apache.activemq.broker.Locker;
import org.apache.activemq.util.DefaultIOExceptionHandler;
/**
@@ -34,7 +36,7 @@ public class JDBCIOExceptionHandler exte
boolean hasLock = true;
if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) broker.getPersistenceAdapter();
- DatabaseLocker locker = jdbcPersistenceAdapter.getDatabaseLocker();
+ Locker locker = jdbcPersistenceAdapter.getDatabaseLocker();
if (locker != null) {
try {
if (!locker.keepAlive()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Aug 30 10:47:49 2012
@@ -41,6 +41,7 @@ import org.apache.activemq.command.Messa
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.broker.Locker;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
@@ -89,7 +90,7 @@ public class JDBCPersistenceAdapter exte
private boolean useDatabaseLock = true;
private long lockKeepAlivePeriod = 1000*30;
private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
- private DatabaseLocker databaseLocker;
+ private Locker locker;
private boolean createTablesOnStartup = true;
private DataSource lockDataSource;
private int transactionIsolation;
@@ -299,7 +300,7 @@ public class JDBCPersistenceAdapter exte
}
if (isUseDatabaseLock()) {
- DatabaseLocker service = getDatabaseLocker();
+ Locker service = getDatabaseLocker();
if (service == null) {
LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
} else {
@@ -340,7 +341,7 @@ public class JDBCPersistenceAdapter exte
}
// do not shutdown clockDaemon as it may kill the thread initiating shutdown
- DatabaseLocker service = getDatabaseLocker();
+ Locker service = getDatabaseLocker();
if (service != null) {
service.stop();
}
@@ -392,21 +393,40 @@ public class JDBCPersistenceAdapter exte
return adapter;
}
- public DatabaseLocker getDatabaseLocker() throws IOException {
- if (databaseLocker == null && isUseDatabaseLock()) {
- setDatabaseLocker(loadDataBaseLocker());
+ /**
+ *
+ * @deprecated as of 5.7.0, replaced by {@link #getLocker()}
+ */
+ @Deprecated
+ public Locker getDatabaseLocker() throws IOException {
+ return getLocker();
+ }
+
+ public Locker getLocker() throws IOException {
+ if (locker == null && isUseDatabaseLock()) {
+ setLocker(loadDataBaseLocker());
}
- return databaseLocker;
+ return locker;
+ }
+
+ /**
+ * Sets the database locker strategy to use to lock the database on startup
+ * @throws IOException
+ *
+ * @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)}
+ */
+ public void setDatabaseLocker(Locker locker) throws IOException {
+ setLocker(locker);
}
/**
* Sets the database locker strategy to use to lock the database on startup
* @throws IOException
*/
- public void setDatabaseLocker(DatabaseLocker locker) throws IOException {
- databaseLocker = locker;
- databaseLocker.setPersistenceAdapter(this);
- databaseLocker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
+ public void setLocker(Locker locker) throws IOException {
+ this.locker = locker;
+ locker.configure(this);
+ locker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
}
public DataSource getLockDataSource() throws IOException {
@@ -616,7 +636,7 @@ public class JDBCPersistenceAdapter exte
protected void databaseLockKeepAlive() {
boolean stop = false;
try {
- DatabaseLocker locker = getDatabaseLocker();
+ Locker locker = getDatabaseLocker();
if (locker != null) {
if (!locker.keepAlive()) {
stop = true;
@@ -640,8 +660,8 @@ public class JDBCPersistenceAdapter exte
}
}
- protected DatabaseLocker loadDataBaseLocker() throws IOException {
- DatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");
+ protected Locker loadDataBaseLocker() throws IOException {
+ DefaultDatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");
if (locker == null) {
locker = new DefaultDatabaseLocker();
LOG.debug("Using default JDBC Locker: " + locker);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java Thu Aug 30 10:47:49 2012
@@ -25,7 +25,11 @@ import java.sql.Timestamp;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
+
+import org.apache.activemq.broker.AbstractLocker;
+import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,12 +40,11 @@ import org.slf4j.LoggerFactory;
* @org.apache.xbean.XBean element="lease-database-locker"
*
*/
-public class LeaseDatabaseLocker implements DatabaseLocker {
+public class LeaseDatabaseLocker extends AbstractLocker {
private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 5000;
protected DataSource dataSource;
protected Statements statements;
- protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
protected boolean stopping;
protected int maxAllowableDiffFromDBTime = 0;
@@ -51,13 +54,16 @@ public class LeaseDatabaseLocker impleme
JDBCPersistenceAdapter persistenceAdapter;
- public void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException {
- this.dataSource = adapter.getLockDataSource();
- this.statements = adapter.getStatements();
- this.persistenceAdapter = adapter;
+ public void configure(PersistenceAdapter adapter) throws IOException {
+ if (adapter instanceof JDBCPersistenceAdapter) {
+ this.persistenceAdapter = (JDBCPersistenceAdapter)adapter;
+ this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
+ this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
+ }
+ lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
}
- public void start() throws Exception {
+ public void doStart() throws Exception {
stopping = false;
LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the Master broker");
@@ -176,7 +182,7 @@ public class LeaseDatabaseLocker impleme
return result;
}
- public void stop() throws Exception {
+ public void doStop(ServiceStopper stopper) throws Exception {
releaseLease();
stopping = true;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/TransactDatabaseLocker.java Thu Aug 30 10:47:49 2012
@@ -36,15 +36,8 @@ import org.slf4j.LoggerFactory;
public class TransactDatabaseLocker extends DefaultDatabaseLocker {
private static final Logger LOG = LoggerFactory.getLogger(TransactDatabaseLocker.class);
- public TransactDatabaseLocker() {
- }
-
- public TransactDatabaseLocker(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
- setPersistenceAdapter(persistenceAdapter);
- }
-
@Override
- public void start() throws Exception {
+ public void doStart() throws Exception {
stopping = false;
LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Thu Aug 30 10:47:49 2012
@@ -27,6 +27,7 @@ import org.apache.activemq.command.Produ
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.broker.Locker;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
@@ -49,6 +50,7 @@ import java.util.Set;
*/
public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
private final KahaDBStore letter = new KahaDBStore();
+ private Locker locker;
/**
* @param context
@@ -189,6 +191,7 @@ public class KahaDBPersistenceAdapter im
* @see org.apache.activemq.Service#start()
*/
public void start() throws Exception {
+ getLocker().start();
this.letter.start();
}
@@ -197,7 +200,11 @@ public class KahaDBPersistenceAdapter im
* @see org.apache.activemq.Service#stop()
*/
public void stop() throws Exception {
- this.letter.stop();
+ try {
+ this.letter.stop();
+ } finally {
+ getLocker().stop();
+ }
}
/**
@@ -586,6 +593,24 @@ public class KahaDBPersistenceAdapter im
return rc;
}
+ public void setLocker(Locker locker) {
+ this.locker = locker;
+ }
+
+ protected Locker getLocker() throws IOException {
+ if (this.locker == null) {
+ this.locker = createDefaultLocker();
+ }
+ return this.locker;
+ }
+
+ protected Locker createDefaultLocker() throws IOException {
+ SharedFileLocker locker = new SharedFileLocker();
+ locker.configure(this);
+ locker.setLockAcquireSleepInterval(getDatabaseLockedWaitDelay());
+ return locker;
+ }
+
@Override
public String toString() {
String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu Aug 30 10:47:49 2012
@@ -368,49 +368,10 @@ public abstract class MessageDatabase ex
}
}
- private void lock() throws IOException {
-
- if (lockFile == null) {
- File lockFileName = new File(directory, "lock");
- lockFile = new LockFile(lockFileName, true);
- if (failIfDatabaseIsLocked) {
- lockFile.lock();
- } else {
- boolean locked = false;
- while ((!isStopped()) && (!isStopping())) {
- try {
- lockFile.lock();
- locked = true;
- break;
- } catch (IOException e) {
- LOG.info("Database "
- + lockFileName
- + " is locked... waiting "
- + (getDatabaseLockedWaitDelay() / 1000)
- + " seconds for the database to be unlocked. Reason: "
- + e);
- try {
- Thread.sleep(getDatabaseLockedWaitDelay());
- } catch (InterruptedException e1) {
- }
- }
- }
- if (!locked) {
- throw new IOException("attempt to obtain lock aborted due to shutdown");
- }
- }
- }
- }
-
- // for testing
- public LockFile getLockFile() {
- return lockFile;
- }
-
public void load() throws IOException {
this.indexLock.writeLock().lock();
+ IOHelper.mkdirs(directory);
try {
- lock();
if (deleteAllMessages) {
getJournal().start();
getJournal().delete();
@@ -430,30 +391,25 @@ public abstract class MessageDatabase ex
public void close() throws IOException, InterruptedException {
if( opened.compareAndSet(true, false)) {
+ this.indexLock.writeLock().lock();
try {
- this.indexLock.writeLock().lock();
- try {
- if (metadata.page != null) {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- checkpointUpdate(tx, true);
- }
- });
- }
- pageFile.unload();
- metadata = new Metadata();
- } finally {
- this.indexLock.writeLock().unlock();
- }
- journal.close();
- synchronized (checkpointThreadLock) {
- if (checkpointThread != null) {
- checkpointThread.join();
- }
+ if (metadata.page != null) {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ checkpointUpdate(tx, true);
+ }
+ });
}
+ pageFile.unload();
+ metadata = new Metadata();
} finally {
- lockFile.unlock();
- lockFile=null;
+ this.indexLock.writeLock().unlock();
+ }
+ journal.close();
+ synchronized (checkpointThreadLock) {
+ if (checkpointThread != null) {
+ checkpointThread.join();
+ }
}
}
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/SharedFileLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/SharedFileLocker.java?rev=1378881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/SharedFileLocker.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/SharedFileLocker.java Thu Aug 30 10:47:49 2012
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.broker.AbstractLocker;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.kahadb.util.LockFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Represents an exclusive lock on a database to avoid multiple brokers running
+ * against the same logical database.
+ *
+ * @org.apache.xbean.XBean element="shared-file-locker"
+ *
+ */
+public class SharedFileLocker extends AbstractLocker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SharedFileLocker.class);
+
+ private LockFile lockFile;
+ protected File directory = MessageDatabase.DEFAULT_DIRECTORY;
+
+ @Override
+ public void doStart() throws Exception {
+ if (lockFile == null) {
+ File lockFileName = new File(directory, "lock");
+ lockFile = new LockFile(lockFileName, true);
+ if (failIfLocked) {
+ lockFile.lock();
+ } else {
+ boolean locked = false;
+ while ((!isStopped()) && (!isStopping())) {
+ try {
+ lockFile.lock();
+ locked = true;
+ break;
+ } catch (IOException e) {
+ LOG.info("Database "
+ + lockFileName
+ + " is locked... waiting "
+ + (lockAcquireSleepInterval / 1000)
+ + " seconds for the database to be unlocked. Reason: "
+ + e);
+ try {
+ Thread.sleep(lockAcquireSleepInterval);
+ } catch (InterruptedException e1) {
+ }
+ }
+ }
+ if (!locked) {
+ throw new IOException("attempt to obtain lock aborted due to shutdown");
+ }
+ }
+ }
+ }
+
+ @Override
+ public void doStop(ServiceStopper stopper) throws Exception {
+ lockFile.unlock();
+ lockFile=null;
+ }
+
+ public File getDirectory() {
+ return directory;
+ }
+
+ public void setDirectory(File directory) {
+ this.directory = directory;
+ }
+
+ @Override
+ public void configure(PersistenceAdapter persistenceAdapter) throws IOException {
+ this.setDirectory(persistenceAdapter.getDirectory());
+ }
+}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java?rev=1378881&r1=1378880&r2=1378881&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java Thu Aug 30 10:47:49 2012
@@ -55,11 +55,11 @@ public class LeaseDatabaseLockerTest {
LeaseDatabaseLocker lockerA = new LeaseDatabaseLocker();
brokerService.setBrokerName("First");
- lockerA.setPersistenceAdapter(jdbc);
+ lockerA.configure(jdbc);
final LeaseDatabaseLocker lockerB = new LeaseDatabaseLocker();
brokerService.setBrokerName("Second");
- lockerB.setPersistenceAdapter(jdbc);
+ lockerB.configure(jdbc);
final AtomicBoolean blocked = new AtomicBoolean(true);
final Connection connection = dataSource.getConnection();
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/CustomLockerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/CustomLockerTest.java?rev=1378881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/CustomLockerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/CustomLockerTest.java Thu Aug 30 10:47:49 2012
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import junit.framework.TestCase;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+
+public class CustomLockerTest extends TestCase {
+
+ public void testCustomLocker() throws Exception {
+ BrokerService broker = BrokerFactory.createBroker("xbean:org/apache/activemq/store/kahadb/shared.xml");
+ broker.waitUntilStarted();
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+}
Added: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/shared.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/shared.xml?rev=1378881&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/shared.xml (added)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/store/kahadb/shared.xml Thu Aug 30 10:47:49 2012
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <!-- normal ActiveMQ XML config which is less verbose & can be validated -->
+ <amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
+ useLoggingForShutdownErrors="true" useJmx="true"
+ persistent="true" vmConnectorURI="vm://javacoola"
+ useShutdownHook="false" deleteAllMessagesOnStartup="true">
+
+ <amq:persistenceAdapter>
+ <amq:kahaDB directory = "target/activemq-data">
+ <amq:locker>
+ <amq:shared-file-locker lockAcquireSleepInterval="5000"/>
+ </amq:locker>
+ </amq:kahaDB>
+ </amq:persistenceAdapter>
+
+ <amq:systemUsage>
+ <amq:systemUsage>
+ <amq:memoryUsage>
+ <amq:memoryUsage limit="10 mb" percentUsageMinDelta="20"/>
+ </amq:memoryUsage>
+ <amq:storeUsage>
+ <amq:storeUsage limit="1 gb" name="foo"/>
+ </amq:storeUsage>
+ <amq:tempUsage>
+ <amq:tempUsage limit="100 mb"/>
+ </amq:tempUsage>
+ </amq:systemUsage>
+ </amq:systemUsage>
+
+ <amq:transportConnectors>
+ <amq:transportConnector uri="tcp://localhost:61635"/>
+ </amq:transportConnectors>
+
+ </amq:broker>
+
+</beans>
\ No newline at end of file