You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/09/11 15:17:40 UTC
svn commit: r1383400 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/store/jdbc/
main/java/org/apache/activemq/store/journal/
main/java/org/apache/activemq/store/kahadb/ main/java/org/apache/a...
Author: dejanb
Date: Tue Sep 11 13:17:39 2012
New Revision: 1383400
URL: http://svn.apache.org/viewvc?rev=1383400&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4005 - introducing Lockable interface and LockableServiceSupport for easier dealing with locks
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Lockable.java
- copied, changed from r1383370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceServiceSupport.java
- copied, changed from r1383370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java
Removed:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java Tue Sep 11 13:17:39 2012
@@ -31,7 +31,7 @@ public abstract class AbstractLocker ext
@Override
public boolean keepAlive() throws IOException {
- return false;
+ return true;
}
@Override
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Lockable.java (from r1383370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Lockable.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Lockable.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java&r1=1383370&r2=1383400&rev=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Lockable.java Tue Sep 11 13:17:39 2012
@@ -16,36 +16,42 @@
*/
package org.apache.activemq.broker;
-import org.apache.activemq.broker.Locker;
-import org.apache.activemq.util.ServiceSupport;
-
import java.io.IOException;
-public abstract class AbstractLocker extends ServiceSupport implements Locker {
+/**
+ * A lockable broker resource. Uses {@link Locker} to guarantee that only single instance is running
+ *
+ */
+public interface Lockable {
- public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 10 * 1000;
+ /**
+ * Turn locking on/off on the resource
+ *
+ * @param useLock
+ */
+ public void setUseLock(boolean useLock);
+
+ /**
+ * Create a default locker
+ *
+ * @return default locker
+ * @throws IOException
+ */
+ public Locker createDefaultLocker() throws IOException;
+
+ /**
+ * Set locker to be used
+ *
+ * @param locker
+ * @throws IOException
+ */
+ public void setLocker(Locker locker) throws IOException;
+
+ /**
+ * Period (in milliseconds) on which {@link org.apache.activemq.broker.Locker#keepAlive()} should be checked
+ *
+ * @param lockKeepAlivePeriod
+ */
+ public void setLockKeepAlivePeriod(long lockKeepAlivePeriod);
- protected String name;
- protected boolean failIfLocked = false;
- protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
-
- @Override
- public boolean keepAlive() throws IOException {
- return false;
- }
-
- @Override
- public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
- this.lockAcquireSleepInterval = lockAcquireSleepInterval;
- }
-
- @Override
- public void setName(String name) {
- this.name = name;
- }
-
- @Override
- public void setFailIfLocked(boolean failIfLocked) {
- this.failIfLocked = failIfLocked;
- }
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java?rev=1383400&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java Tue Sep 11 13:17:39 2012
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.ThreadPoolUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Helper class for working with services that requires locking
+ */
+public abstract class LockableServiceSupport extends ServiceSupport implements Lockable, BrokerServiceAware {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LockableServiceSupport.class);
+ boolean useLock = true;
+ Locker locker;
+ long lockKeepAlivePeriod = 0;
+ private ScheduledFuture<?> keepAliveTicket;
+ private ScheduledThreadPoolExecutor clockDaemon;
+ private BrokerService brokerService;
+
+ /**
+ * Initialize resources before locking
+ *
+ * @throws Exception
+ */
+ abstract public void init() throws Exception;
+
+ @Override
+ public void setUseLock(boolean useLock) {
+ this.useLock = useLock;
+ }
+
+ @Override
+ public void setLocker(Locker locker) throws IOException {
+ this.locker = locker;
+ if (this instanceof PersistenceAdapter) {
+ this.locker.configure((PersistenceAdapter)this);
+ }
+ }
+
+ public Locker getLocker() throws IOException {
+ if (this.locker == null) {
+ this.locker = createDefaultLocker();
+ }
+ return this.locker;
+ }
+
+ @Override
+ public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
+ this.lockKeepAlivePeriod = lockKeepAlivePeriod;
+ }
+
+ @Override
+ public void preStart() throws Exception {
+ init();
+ if (useLock) {
+ if (getLocker() == null) {
+ LOG.warn("No locker configured");
+ } else {
+ getLocker().start();
+ if (lockKeepAlivePeriod > 0) {
+ keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ keepLockAlive();
+ }
+ }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
+ }
+ if (brokerService != null) {
+ brokerService.getBroker().nowMasterBroker();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void postStop(ServiceStopper stopper) throws Exception {
+ if (useLock) {
+ if (keepAliveTicket != null) {
+ keepAliveTicket.cancel(false);
+ keepAliveTicket = null;
+ }
+ if (locker != null) {
+ getLocker().stop();
+ }
+ ThreadPoolUtils.shutdown(clockDaemon);
+ }
+ }
+
+ protected void keepLockAlive() {
+ boolean stop = false;
+ try {
+ Locker locker = getLocker();
+ if (locker != null) {
+ if (!locker.keepAlive()) {
+ stop = true;
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("locker keepalive resulted in: " + e, e);
+ }
+ if (stop) {
+ stopBroker();
+ }
+ }
+
+ protected void stopBroker() {
+ // we can no longer keep the lock so lets fail
+ LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
+ try {
+ brokerService.stop();
+ } catch (Exception e) {
+ LOG.warn("Failure occurred while stopping broker");
+ }
+ }
+
+ public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
+ if (clockDaemon == null) {
+ clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "ActiveMQ Lock KeepAlive Timer");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ }
+ return clockDaemon;
+ }
+
+ @Override
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
+}
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceServiceSupport.java (from r1383370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceServiceSupport.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceServiceSupport.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java&r1=1383370&r2=1383400&rev=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceServiceSupport.java Tue Sep 11 13:17:39 2012
@@ -21,6 +21,7 @@ import java.io.IOException;
import javax.sql.DataSource;
+import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.derby.jdbc.EmbeddedDataSource;
@@ -30,16 +31,16 @@ import org.apache.derby.jdbc.EmbeddedDat
*
*
*/
-public class DataSourceSupport {
+abstract public class DataSourceServiceSupport extends LockableServiceSupport {
private String dataDirectory = IOHelper.getDefaultDataDirectory();
private File dataDirectoryFile;
private DataSource dataSource;
- public DataSourceSupport() {
+ public DataSourceServiceSupport() {
}
- public DataSourceSupport(DataSource dataSource) {
+ public DataSourceServiceSupport(DataSource dataSource) {
this.dataSource = dataSource;
}
@@ -64,7 +65,7 @@ public class DataSourceSupport {
public DataSource getDataSource() throws IOException {
if (dataSource == null) {
- dataSource = createDataSource();
+ dataSource = createDataSource(getDataDirectoryFile().getCanonicalPath());
if (dataSource == null) {
throw new IllegalArgumentException("No dataSource property has been configured");
}
@@ -76,10 +77,10 @@ public class DataSourceSupport {
this.dataSource = dataSource;
}
- protected DataSource createDataSource() throws IOException {
+ public static DataSource createDataSource(String homeDir) throws IOException {
// Setup the Derby datasource.
- System.setProperty("derby.system.home", getDataDirectoryFile().getCanonicalPath());
+ System.setProperty("derby.system.home", homeDir);
System.setProperty("derby.storage.fileSyncTransactionLog", "true");
System.setProperty("derby.storage.pageCacheSize", "100");
@@ -93,4 +94,6 @@ public class DataSourceSupport {
return "" + dataSource;
}
+
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Tue Sep 11 13:17:39 2012
@@ -54,6 +54,7 @@ import org.apache.activemq.util.ByteSequ
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +71,7 @@ import org.slf4j.LoggerFactory;
*
*
*/
-public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
+public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter,
BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
@@ -79,19 +80,17 @@ public class JDBCPersistenceAdapter exte
private static FactoryFinder lockFactoryFinder = new FactoryFinder(
"META-INF/services/org/apache/activemq/store/jdbc/lock/");
+ public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000;
+
private WireFormat wireFormat = new OpenWireFormat();
private BrokerService brokerService;
private Statements statements;
private JDBCAdapter adapter;
private MemoryTransactionStore transactionStore;
private ScheduledThreadPoolExecutor clockDaemon;
- private ScheduledFuture<?> cleanupTicket, keepAliveTicket;
+ private ScheduledFuture<?> cleanupTicket;
private int cleanupPeriod = 1000 * 60 * 5;
private boolean useExternalMessageReferences;
- private boolean useDatabaseLock = true;
- private long lockKeepAlivePeriod = 1000*30;
- private long lockAcquireSleepInterval = DefaultDatabaseLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
- private Locker locker;
private boolean createTablesOnStartup = true;
private DataSource lockDataSource;
private int transactionIsolation;
@@ -106,6 +105,10 @@ public class JDBCPersistenceAdapter exte
protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
+ {
+ setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD);
+ }
+
public JDBCPersistenceAdapter() {
}
@@ -281,8 +284,8 @@ public class JDBCPersistenceAdapter exte
}
}
-
- public void start() throws Exception {
+ @Override
+ public void init() throws Exception {
getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
if (isCreateTablesOnStartup()) {
@@ -299,26 +302,9 @@ public class JDBCPersistenceAdapter exte
transactionContext.commit();
}
}
+ }
- if (isUseDatabaseLock()) {
- Locker service = getLocker();
- if (service == null) {
- LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
- } else {
- service.start();
- if (lockKeepAlivePeriod > 0) {
- keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
- public void run() {
- databaseLockKeepAlive();
- }
- }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
- }
- if (brokerService != null) {
- brokerService.getBroker().nowMasterBroker();
- }
- }
- }
-
+ public void doStart() throws Exception {
// Cleanup the db periodically.
if (cleanupPeriod > 0) {
cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
@@ -331,21 +317,11 @@ public class JDBCPersistenceAdapter exte
createMessageAudit();
}
- public synchronized void stop() throws Exception {
+ public synchronized void doStop(ServiceStopper stopper) throws Exception {
if (cleanupTicket != null) {
cleanupTicket.cancel(true);
cleanupTicket = null;
}
- if (keepAliveTicket != null) {
- keepAliveTicket.cancel(false);
- keepAliveTicket = null;
- }
-
- // do not shutdown clockDaemon as it may kill the thread initiating shutdown
- Locker service = getDatabaseLocker();
- if (service != null) {
- service.stop();
- }
}
public void cleanup() {
@@ -403,13 +379,6 @@ public class JDBCPersistenceAdapter exte
return getLocker();
}
- public Locker getLocker() throws IOException {
- if (locker == null && isUseDatabaseLock()) {
- setLocker(loadDataBaseLocker());
- }
- return locker;
- }
-
/**
* Sets the database locker strategy to use to lock the database on startup
* @throws IOException
@@ -420,16 +389,6 @@ public class JDBCPersistenceAdapter exte
setLocker(locker);
}
- /**
- * Sets the database locker strategy to use to lock the database on startup
- * @throws IOException
- */
- public void setLocker(Locker locker) throws IOException {
- this.locker = locker;
- locker.configure(this);
- locker.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
- }
-
public DataSource getLockDataSource() throws IOException {
if (lockDataSource == null) {
lockDataSource = getDataSource();
@@ -595,16 +554,15 @@ public class JDBCPersistenceAdapter exte
this.createTablesOnStartup = createTablesOnStartup;
}
- public boolean isUseDatabaseLock() {
- return useDatabaseLock;
- }
-
/**
+ * @deprecated use {@link #setUseLock(boolean)} instead
+ *
* Sets whether or not an exclusive database lock should be used to enable
* JDBC Master/Slave. Enabled by default.
*/
+ @Deprecated
public void setUseDatabaseLock(boolean useDatabaseLock) {
- this.useDatabaseLock = useDatabaseLock;
+ setUseLock(useDatabaseLock);
}
public static void log(String msg, SQLException e) {
@@ -634,39 +592,13 @@ public class JDBCPersistenceAdapter exte
public void setUsageManager(SystemUsage usageManager) {
}
- protected void databaseLockKeepAlive() {
- boolean stop = false;
- try {
- Locker locker = getDatabaseLocker();
- if (locker != null) {
- if (!locker.keepAlive()) {
- stop = true;
- }
- }
- } catch (IOException e) {
- LOG.warn("databaselocker keepalive resulted in: " + e, e);
- }
- if (stop) {
- stopBroker();
- }
- }
-
- protected void stopBroker() {
- // we can no longer keep the lock so lets fail
- LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
- try {
- brokerService.stop();
- } catch (Exception e) {
- LOG.warn("Failure occurred while stopping broker");
- }
- }
-
- protected Locker loadDataBaseLocker() throws IOException {
+ public Locker createDefaultLocker() throws IOException {
DefaultDatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");
if (locker == null) {
locker = new DefaultDatabaseLocker();
LOG.debug("Using default JDBC Locker: " + locker);
}
+ locker.configure(this);
return locker;
}
@@ -711,24 +643,15 @@ public class JDBCPersistenceAdapter exte
return 0;
}
- public long getLockKeepAlivePeriod() {
- return lockKeepAlivePeriod;
- }
-
- public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
- this.lockKeepAlivePeriod = lockKeepAlivePeriod;
- }
-
- public long getLockAcquireSleepInterval() {
- return lockAcquireSleepInterval;
- }
-
/**
+ * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
+ *
* millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
* not applied if DataBaseLocker is injected.
+ *
*/
- public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
- this.lockAcquireSleepInterval = lockAcquireSleepInterval;
+ public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException {
+ getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval);
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapterFactory.java Tue Sep 11 13:17:39 2012
@@ -22,13 +22,15 @@ import java.io.IOException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.active.JournalImpl;
import org.apache.activeio.journal.active.JournalLockedException;
+import org.apache.activemq.broker.Locker;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterFactory;
-import org.apache.activemq.store.jdbc.DataSourceSupport;
+import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +40,7 @@ import org.slf4j.LoggerFactory;
* @org.apache.xbean.XBean
*
*/
-public class JournalPersistenceAdapterFactory extends DataSourceSupport implements PersistenceAdapterFactory {
+public class JournalPersistenceAdapterFactory extends DataSourceServiceSupport implements PersistenceAdapterFactory {
private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -185,16 +187,12 @@ public class JournalPersistenceAdapterFa
jdbcPersistenceAdapter.setStatements(statements);
}
- public boolean isUseDatabaseLock() {
- return jdbcPersistenceAdapter.isUseDatabaseLock();
- }
-
/**
* Sets whether or not an exclusive database lock should be used to enable
* JDBC Master/Slave. Enabled by default.
*/
public void setUseDatabaseLock(boolean useDatabaseLock) {
- jdbcPersistenceAdapter.setUseDatabaseLock(useDatabaseLock);
+ jdbcPersistenceAdapter.setUseLock(useDatabaseLock);
}
public boolean isCreateTablesOnStartup() {
@@ -245,4 +243,18 @@ public class JournalPersistenceAdapterFa
}
}
+ @Override
+ public Locker createDefaultLocker() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void init() throws Exception {
+ }
+
+ @Override
+ protected void doStop(ServiceStopper stopper) throws Exception {}
+
+ @Override
+ protected void doStart() throws Exception {}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Tue Sep 11 13:17:39 2012
@@ -17,8 +17,8 @@
package org.apache.activemq.store.kahadb;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -37,6 +37,7 @@ import org.apache.activemq.store.kahadb.
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ServiceStopper;
import java.io.File;
import java.io.IOException;
@@ -49,9 +50,8 @@ import java.util.Set;
* @org.apache.xbean.XBean element="kahaDB"
*
*/
-public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
+public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter {
private final KahaDBStore letter = new KahaDBStore();
- private Locker locker;
/**
* @param context
@@ -191,8 +191,7 @@ public class KahaDBPersistenceAdapter im
* @throws Exception
* @see org.apache.activemq.Service#start()
*/
- public void start() throws Exception {
- getLocker().start();
+ public void doStart() throws Exception {
this.letter.start();
}
@@ -200,12 +199,8 @@ public class KahaDBPersistenceAdapter im
* @throws Exception
* @see org.apache.activemq.Service#stop()
*/
- public void stop() throws Exception {
- try {
- this.letter.stop();
- } finally {
- getLocker().stop();
- }
+ public void doStop(ServiceStopper stopper) throws Exception {
+ this.letter.stop();
}
/**
@@ -486,17 +481,13 @@ public class KahaDBPersistenceAdapter im
}
/**
- * @return the databaseLockedWaitDelay
- */
- public int getDatabaseLockedWaitDelay() {
- return letter.getDatabaseLockedWaitDelay();
- }
-
- /**
+ * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
+ *
* @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
*/
- public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
- letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
+ @Deprecated
+ public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException {
+ getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay);
}
public boolean getForceRecoverIndex() {
@@ -594,25 +585,16 @@ public class KahaDBPersistenceAdapter im
return rc;
}
- public void setLocker(Locker locker) {
- this.locker = locker;
- }
-
- protected Locker getLocker() throws IOException {
- if (this.locker == null) {
- this.locker = createDefaultLocker();
- }
- return this.locker;
- }
-
- protected Locker createDefaultLocker() throws IOException {
+ public Locker createDefaultLocker() throws IOException {
SharedFileLocker locker = new SharedFileLocker();
locker.configure(this);
- locker.setLockAcquireSleepInterval(getDatabaseLockedWaitDelay());
return locker;
}
@Override
+ public void init() throws Exception {}
+
+ @Override
public String toString() {
String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
return "KahaDBPersistenceAdapter[" + path + "]";
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Sep 11 13:17:39 2012
@@ -221,7 +221,6 @@ public abstract class MessageDatabase ex
private int indexCacheSize = 10000;
private boolean checkForCorruptJournalFiles = false;
private boolean checksumJournalFiles = false;
- private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
protected boolean forceRecoverIndex = false;
private final Object checkpointThreadLock = new Object();
private boolean rewriteOnRedelivery = false;
@@ -2308,20 +2307,6 @@ public abstract class MessageDatabase ex
this.directoryArchive = directoryArchive;
}
- /**
- * @return the databaseLockedWaitDelay
- */
- public int getDatabaseLockedWaitDelay() {
- return this.databaseLockedWaitDelay;
- }
-
- /**
- * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
- */
- public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
- this.databaseLockedWaitDelay = databaseLockedWaitDelay;
- }
-
public boolean isRewriteOnRedelivery() {
return rewriteOnRedelivery;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java Tue Sep 11 13:17:39 2012
@@ -51,6 +51,7 @@ public abstract class ServiceSupport imp
boolean success = false;
stopped.set(false);
try {
+ preStart();
doStart();
success = true;
} finally {
@@ -70,6 +71,8 @@ public abstract class ServiceSupport imp
doStop(stopper);
} catch (Exception e) {
stopper.onException(this, e);
+ } finally {
+ postStop(stopper);
}
stopped.set(true);
started.set(false);
@@ -110,7 +113,23 @@ public abstract class ServiceSupport imp
this.serviceListeners.remove(l);
}
+ /**
+ *
+ * handle for various operations after stopping the service (like locking)
+ *
+ * @throws Exception
+ */
+ protected void postStop(ServiceStopper stopper) throws Exception {}
+
protected abstract void doStop(ServiceStopper stopper) throws Exception;
+ /**
+ *
+ * handle for various operations before starting the service (like locking)
+ *
+ * @throws Exception
+ */
+ protected void preStart() throws Exception {}
+
protected abstract void doStart() throws Exception;
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java Tue Sep 11 13:17:39 2012
@@ -17,14 +17,11 @@
package org.apache.activemq.broker.ft;
import java.io.IOException;
-import java.sql.Connection;
import java.util.concurrent.TimeUnit;
-import junit.framework.Test;
+
import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.store.jdbc.DataSourceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
-import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +31,7 @@ public class DbRestartJDBCQueueMasterSla
@Override
protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
super.configureJdbcPersistenceAdapter(persistenceAdapter);
- persistenceAdapter.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
+ persistenceAdapter.getLocker().setLockAcquireSleepInterval(getLockAcquireSleepInterval());
persistenceAdapter.setLockKeepAlivePeriod(getLockKeepAlivePeriod());
persistenceAdapter.setLocker(new LeaseDatabaseLocker());
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java Tue Sep 11 13:17:39 2012
@@ -28,9 +28,10 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.jdbc.DataSourceSupport;
+import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOHelper;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ public class DbRestartJDBCQueueTest exte
topic = false;
verbose = true;
// startup db
- sharedDs = (EmbeddedDataSource) new DataSourceSupport().getDataSource();
+ sharedDs = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
broker = new BrokerService();
@@ -65,9 +66,9 @@ public class DbRestartJDBCQueueTest exte
broker.setDeleteAllMessagesOnStartup(true);
JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
persistenceAdapter.setDataSource(sharedDs);
- persistenceAdapter.setUseDatabaseLock(false);
+ persistenceAdapter.setUseLock(false);
persistenceAdapter.setLockKeepAlivePeriod(500);
- persistenceAdapter.setLockAcquireSleepInterval(500);
+ persistenceAdapter.getLocker().setLockAcquireSleepInterval(500);
broker.setPersistenceAdapter(persistenceAdapter);
broker.start();
super.setUp();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java Tue Sep 11 13:17:39 2012
@@ -28,8 +28,9 @@ import javax.sql.DataSource;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.store.jdbc.DataSourceSupport;
+import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
import org.apache.derby.jdbc.EmbeddedDataSource;
public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
@@ -39,7 +40,7 @@ public class JDBCQueueMasterSlaveTest ex
protected void setUp() throws Exception {
// startup db
- sharedDs = new SyncDataSource((EmbeddedDataSource)new DataSourceSupport().getDataSource());
+ sharedDs = new SyncDataSource((EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()));
super.setUp();
}
@@ -96,7 +97,7 @@ public class JDBCQueueMasterSlaveTest ex
protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
persistenceAdapter.setLockKeepAlivePeriod(500);
- persistenceAdapter.setLockAcquireSleepInterval(500);
+ persistenceAdapter.getLocker().setLockAcquireSleepInterval(500);
}
protected DataSource getExistingDataSource() throws Exception {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java Tue Sep 11 13:17:39 2012
@@ -347,7 +347,7 @@ public class FailoverStaticNetworkTest {
brokerA.setBrokerName("Pair");
brokerA.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart("A") + "," + "Type=Broker"));
- ((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000);
+ ((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000);
brokerA.start();
brokerA.waitUntilStopped();
@@ -378,7 +378,7 @@ public class FailoverStaticNetworkTest {
// so they can coexist in local jmx we set the object name b/c the brokername identifies the shared store
brokerA1.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName="
+ JMXSupport.encodeObjectNamePart("A1") + "," + "Type=Broker"));
- ((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).setDatabaseLockedWaitDelay(1000);
+ ((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000);
brokerA1.start();
brokerA1.waitUntilStopped();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java?rev=1383400&r1=1383399&r2=1383400&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java Tue Sep 11 13:17:39 2012
@@ -164,7 +164,7 @@ public class JDBCCommitExceptionTest ext
dataSource.setCreateDatabase("create");
jdbc.setDataSource(dataSource);
- jdbc.setUseDatabaseLock(false);
+ jdbc.setUseLock(false);
jdbc.deleteAllMessages();
broker.setPersistenceAdapter(jdbc);