You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/12/12 23:54:00 UTC
svn commit: r603762 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/
main/java/org/apache/activemq/store/amq/
main/java/org/apache/activemq/store/kahadaptor/ main/java/org/apache/activ...
Author: rajdavies
Date: Wed Dec 12 14:53:59 2007
New Revision: 603762
URL: http://svn.apache.org/viewvc?rev=603762&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1507
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Wed Dec 12 14:53:59 2007
@@ -272,8 +272,22 @@
*/
long size();
+ /**
+ * @return true if persistent indexes are used by default
+ */
public boolean isPersistentIndex();
+ /**
+ * Set a persistent index as the default if the parameter is true
+ * @param persistentIndex
+ */
public void setPersistentIndex(boolean persistentIndex);
+
+ /**
+ * An explict call to initialize - this will also be called
+ * implicitly for any other operation on the store.
+ * @throws IOException
+ */
+ public void initialize() throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Wed Dec 12 14:53:59 2007
@@ -445,10 +445,10 @@
}
if (!initialized) {
- LOG.info("Kaha Store using data directory " + directory);
+
lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
lock();
-
+ LOG.info("Kaha Store using data directory " + directory);
DataManager defaultDM = getDataManager(DEFAULT_CONTAINER_NAME);
rootIndexManager = getIndexManager(defaultDM, DEFAULT_CONTAINER_NAME);
IndexItem mapRoot = new IndexItem();
@@ -486,6 +486,7 @@
if (!BROKEN_FILE_LOCK) {
lock = lockFile.getChannel().tryLock();
if (lock == null) {
+ initialized=false;
throw new StoreLockedExcpetion("Kaha Store " + directory.getName()
+ " is already opened by another application");
} else {
@@ -493,6 +494,7 @@
}
}
} else { // already locked
+ initialized=false;
throw new StoreLockedExcpetion("Kaha Store " + directory.getName()
+ " is already opened by this application.");
}
@@ -501,7 +503,7 @@
private synchronized void unlock() throws IOException {
if (!DISABLE_LOCKING && (null != directory) && (null != lock)) {
- System.getProperties().remove(getPropertyKey());
+ System.clearProperty(getPropertyKey());
if (lock.isValid()) {
lock.release();
}
@@ -510,7 +512,6 @@
}
private String getPropertyKey() throws IOException {
- // Is replaceAll() needed? Should test without it.
return getClass().getName() + ".lock." + directory.getCanonicalPath();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Dec 12 14:53:59 2007
@@ -18,6 +18,8 @@
import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
@@ -83,6 +85,10 @@
private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class);
private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
+ private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
+ private static final boolean BROKEN_FILE_LOCK;
+ private static final boolean DISABLE_LOCKING;
+
private AsyncDataManager asyncDataManager;
private ReferenceStoreAdapter referenceStoreAdapter;
private TaskRunnerFactory taskRunnerFactory;
@@ -112,7 +118,10 @@
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>> ();
-
+ private String directoryPath = "";
+ private RandomAccessFile lockFile;
+ private FileLock lock;
+ private boolean disableLocking = DISABLE_LOCKING;
public String getBrokerName() {
return this.brokerName;
@@ -141,13 +150,17 @@
if (brokerService != null) {
this.directory = brokerService.getBrokerDataDirectory();
} else {
+
this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
this.directory = new File(directory, "amqstore");
+ this.directoryPath=directory.getAbsolutePath();
}
}
if (this.directoryArchive == null) {
this.directoryArchive = new File(this.directory,"archive");
}
+ lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
+ lock();
LOG.info("AMQStore starting using directory: " + directory);
this.directory.mkdirs();
if (archiveDataLogs) {
@@ -240,6 +253,11 @@
if (!started.compareAndSet(true, false)) {
return;
}
+ if (lockFile != null) {
+ lockFile.close();
+ lockFile = null;
+ }
+ unlock();
this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (this) {
Scheduler.cancel(periodicCheckpointTask);
@@ -818,7 +836,15 @@
public void setArchiveDataLogs(boolean archiveDataLogs) {
this.archiveDataLogs = archiveDataLogs;
- }
+ }
+
+ public boolean isDisableLocking() {
+ return disableLocking;
+ }
+
+ public void setDisableLocking(boolean disableLocking) {
+ this.disableLocking = disableLocking;
+ }
protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
@@ -836,4 +862,72 @@
set.remove(dataFileId);
}
}
+
+
+
+ protected void lock() throws IOException, InterruptedException {
+ boolean logged = false;
+ boolean aquiredLock = false;
+ do {
+ if (doLock()) {
+ aquiredLock = true;
+ } else {
+ if (!logged) {
+ LOG.warn("Waiting to Lock the Store " + getDirectory());
+ logged = true;
+ }
+ Thread.sleep(1000);
+ }
+
+ if (aquiredLock && logged) {
+ LOG.info("Aquired lock for AMQ Store" + getDirectory());
+ }
+
+ } while (!aquiredLock && !disableLocking);
+ }
+
+ private synchronized void unlock() throws IOException {
+ if (!disableLocking && (null != directory) && (null != lock)) {
+ System.clearProperty(getPropertyKey());
+ if (lock.isValid()) {
+ lock.release();
+ }
+ lock = null;
+ }
+ }
+
+
+ protected boolean doLock() throws IOException {
+ boolean result = true;
+ if (!disableLocking && directory != null && lock == null) {
+ String key = getPropertyKey();
+ String property = System.getProperty(key);
+ if (null == property) {
+ if (!BROKEN_FILE_LOCK) {
+ lock = lockFile.getChannel().tryLock();
+ if (lock == null) {
+ result = false;
+ } else {
+ System.setProperty(key, new Date().toString());
+ }
+ }
+ } else { // already locked
+ result = false;
+ }
+ }
+ return result;
+ }
+
+ private String getPropertyKey() throws IOException {
+ return getClass().getName() + ".lock." + directory.getCanonicalPath();
+ }
+
+ static {
+ BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
+ + ".FileLockBroken",
+ "false"));
+ DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
+ + ".DisableLocking",
+ "false"));
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java Wed Dec 12 14:53:59 2007
@@ -56,6 +56,7 @@
return recoverMessage(message);
} else {
LOG.error("Message id " + ref + " could not be recovered from the data store!");
+ Thread.dumpStack();
}
return false;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Dec 12 14:53:59 2007
@@ -39,6 +39,7 @@
import org.apache.activemq.kaha.MessageIdMarshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
+import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
import org.apache.activemq.kaha.impl.index.hash.HashIndex;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ReferenceStore;
@@ -53,7 +54,7 @@
- private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
+ private static final Log LOG = LogFactory.getLog(KahaReferenceStoreAdapter.class);
private static final String STORE_STATE = "store-state";
private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
private static final Integer INDEX_VERSION = new Integer(3);
@@ -87,7 +88,7 @@
@Override
public synchronized void start() throws Exception {
super.start();
- Store store = getStateStore();
+ Store store = getStateStore();
boolean empty = store.getMapContainerIds().isEmpty();
stateMap = store.getMapContainer("state", STORE_STATE);
stateMap.load();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java Wed Dec 12 14:53:59 2007
@@ -43,7 +43,7 @@
maxNumberOfArrays = Math.max(maxNumberOfArrays, 1);
list = new LinkedList<BitArray>();
for (int i = 0; i < maxNumberOfArrays; i++) {
- list.add(new BitArray());
+ list.add(null);
}
}
@@ -130,6 +130,10 @@
bin = list.size() - 1;
}
answer = list.get(bin);
+ if (answer == null) {
+ answer = new BitArray();
+ list.set(bin, answer);
+ }
}
return answer;
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java Wed Dec 12 14:53:59 2007
@@ -38,20 +38,15 @@
protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false";
protected void setUp() throws Exception {
+ messageCount = 10000;
if (System.getProperty("basedir") == null) {
File file = new File(".");
System.setProperty("basedir", file.getAbsolutePath());
}
failureCount = super.messageCount / 2;
super.topic = isTopic();
- BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml()));
- brokerFactory.afterPropertiesSet();
- master = brokerFactory.getBroker();
- brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml()));
- brokerFactory.afterPropertiesSet();
- slave = brokerFactory.getBroker();
- master.start();
- slave.start();
+ createMaster();
+ createSlave();
// wait for thing to connect
Thread.sleep(1000);
super.setUp();
@@ -87,5 +82,19 @@
protected boolean isTopic() {
return false;
+ }
+
+ protected void createMaster() throws Exception {
+ BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml()));
+ brokerFactory.afterPropertiesSet();
+ master = brokerFactory.getBroker();
+ master.start();
+ }
+
+ protected void createSlave() throws Exception {
+ BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml()));
+ brokerFactory.afterPropertiesSet();
+ slave = brokerFactory.getBroker();
+ slave.start();
}
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java?rev=603762&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java Wed Dec 12 14:53:59 2007
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.springframework.core.io.ClassPathResource;
+
+
+public class QueueMasterSlaveTestUsingSharedFileTest extends
+ QueueMasterSlaveTest {
+
+ protected String getSlaveXml() {
+ return "org/apache/activemq/broker/ft/sharedFileSlave.xml";
+ }
+
+ protected String getMasterXml() {
+ return "org/apache/activemq/broker/ft/sharedFileMaster.xml";
+ }
+
+ protected void createSlave() throws Exception {
+ new Thread(new Runnable() {
+
+ public void run() {
+ try {
+ QueueMasterSlaveTestUsingSharedFileTest.super.createSlave();
+ } catch (Exception e) {
+
+ e.printStackTrace();
+ }
+
+ }
+
+ }).start();
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java
------------------------------------------------------------------------------
svn:eol-style = native