You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/07/07 21:34:10 UTC
svn commit: r961484 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb:
KahaDBStore.java MessageDatabase.java
Author: rajdavies
Date: Wed Jul 7 19:34:09 2010
New Revision: 961484
URL: http://svn.apache.org/viewvc?rev=961484&view=rev
Log:
Improve concurrency
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=961484&r1=961483&r2=961484&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Wed Jul 7 19:34:09 2010
@@ -389,7 +389,8 @@ public class KahaDBStore extends Message
// operations... but for now we must
// externally synchronize...
Location location;
- synchronized (indexMutex) {
+ indexLock.readLock().lock();
+ try {
location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
public Location execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
@@ -400,6 +401,8 @@ public class KahaDBStore extends Message
return sd.orderIndex.get(tx, sequence).location;
}
});
+ }finally {
+ indexLock.readLock().unlock();
}
if (location == null) {
return null;
@@ -411,7 +414,8 @@ public class KahaDBStore extends Message
public int getMessageCount() throws IOException {
try {
lockAsyncJobQueue();
- synchronized (indexMutex) {
+ indexLock.readLock().lock();
+ try {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count
@@ -427,6 +431,8 @@ public class KahaDBStore extends Message
return rc;
}
});
+ }finally {
+ indexLock.readLock().unlock();
}
} finally {
unlockAsyncJobQueue();
@@ -435,7 +441,8 @@ public class KahaDBStore extends Message
@Override
public boolean isEmpty() throws IOException {
- synchronized (indexMutex) {
+ indexLock.readLock().lock();
+ try {
return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
public Boolean execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count of
@@ -444,11 +451,14 @@ public class KahaDBStore extends Message
return sd.locationIndex.isEmpty(tx);
}
});
+ }finally {
+ indexLock.readLock().unlock();
}
}
public void recover(final MessageRecoveryListener listener) throws Exception {
- synchronized (indexMutex) {
+ indexLock.readLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
@@ -459,13 +469,16 @@ public class KahaDBStore extends Message
}
}
});
+ }finally {
+ indexLock.readLock().unlock();
}
}
long cursorPos = 0;
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
- synchronized (indexMutex) {
+ indexLock.readLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
@@ -486,6 +499,8 @@ public class KahaDBStore extends Message
}
}
});
+ }finally {
+ indexLock.readLock().unlock();
}
}
@@ -503,13 +518,16 @@ public class KahaDBStore extends Message
// operations... but for now we must
// externally synchronize...
Long location;
- synchronized (indexMutex) {
+ indexLock.readLock().lock();
+ try {
location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
public Long execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
return sd.messageIdIndex.get(tx, key);
}
});
+ }finally {
+ indexLock.readLock().unlock();
}
if (location != null) {
cursorPos = location + 1;
@@ -638,7 +656,8 @@ public class KahaDBStore extends Message
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
- synchronized (indexMutex) {
+ indexLock.readLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
@@ -652,6 +671,8 @@ public class KahaDBStore extends Message
}
}
});
+ }finally {
+ indexLock.readLock().unlock();
}
SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
@@ -661,7 +682,8 @@ public class KahaDBStore extends Message
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
- synchronized (indexMutex) {
+ indexLock.readLock().lock();
+ try {
return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
public SubscriptionInfo execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
@@ -673,13 +695,16 @@ public class KahaDBStore extends Message
.getSubscriptionInfo().newInput()));
}
});
+ }finally {
+ indexLock.readLock().unlock();
}
}
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
- synchronized (indexMutex) {
+ indexLock.readLock().lock();
+ try {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
public Integer execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
@@ -716,13 +741,16 @@ public class KahaDBStore extends Message
return counter;
}
});
+ }finally {
+ indexLock.readLock().unlock();
}
}
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
- synchronized (indexMutex) {
+ indexLock.readLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
@@ -736,13 +764,16 @@ public class KahaDBStore extends Message
}
}
});
+ }finally {
+ indexLock.readLock().unlock();
}
}
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
- synchronized (indexMutex) {
+ indexLock.readLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<Exception>() {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
@@ -768,19 +799,24 @@ public class KahaDBStore extends Message
}
}
});
+ }finally {
+ indexLock.readLock().unlock();
}
}
public void resetBatching(String clientId, String subscriptionName) {
try {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
- synchronized (indexMutex) {
+ indexLock.writeLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
sd.subscriptionCursors.remove(subscriptionKey);
}
});
+ }finally {
+ indexLock.writeLock().unlock();
}
} catch (IOException e) {
throw new RuntimeException(e);
@@ -827,7 +863,8 @@ public class KahaDBStore extends Message
public Set<ActiveMQDestination> getDestinations() {
try {
final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
- synchronized (indexMutex) {
+ indexLock.readLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
@@ -852,6 +889,8 @@ public class KahaDBStore extends Message
return isEmptyTopic;
}
});
+ }finally {
+ indexLock.readLock().unlock();
}
return rc;
} catch (IOException e) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=961484&r1=961483&r2=961484&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Wed Jul 7 19:34:09 2010
@@ -35,6 +35,9 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.ConnectionId;
@@ -193,7 +196,8 @@ public class MessageDatabase extends Ser
}
private void loadPageFile() throws IOException {
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
final PageFile pageFile = getPageFile();
pageFile.load();
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@@ -232,6 +236,8 @@ public class MessageDatabase extends Ser
}
}
});
+ }finally {
+ this.indexLock.writeLock().unlock();
}
}
@@ -307,7 +313,8 @@ public class MessageDatabase extends Ser
public void load() throws IOException {
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
lock();
if (deleteAllMessages) {
getJournal().start();
@@ -321,7 +328,8 @@ public class MessageDatabase extends Ser
open();
store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
-
+ }finally {
+ this.indexLock.writeLock().unlock();
}
}
@@ -329,7 +337,8 @@ public class MessageDatabase extends Ser
public void close() throws IOException, InterruptedException {
if( opened.compareAndSet(true, false)) {
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, true);
@@ -337,6 +346,8 @@ public class MessageDatabase extends Ser
});
pageFile.unload();
metadata = new Metadata();
+ }finally {
+ this.indexLock.writeLock().unlock();
}
journal.close();
checkpointThread.join();
@@ -346,7 +357,8 @@ public class MessageDatabase extends Ser
}
public void unload() throws IOException, InterruptedException {
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
if( pageFile != null && pageFile.isLoaded() ) {
metadata.state = CLOSED_STATE;
metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
@@ -357,6 +369,8 @@ public class MessageDatabase extends Ser
}
});
}
+ }finally {
+ this.indexLock.writeLock().unlock();
}
close();
}
@@ -389,7 +403,8 @@ public class MessageDatabase extends Ser
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, IOException {
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
long start = System.currentTimeMillis();
Location recoveryPosition = getRecoveryPosition();
@@ -413,6 +428,8 @@ public class MessageDatabase extends Ser
recoverIndex(tx);
}
});
+ }finally {
+ this.indexLock.writeLock().unlock();
}
}
@@ -559,7 +576,8 @@ public class MessageDatabase extends Ser
private Location lastRecoveryPosition;
public void incrementalRecover() throws IOException {
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
if( nextRecoveryPosition == null ) {
if( lastRecoveryPosition==null ) {
nextRecoveryPosition = getRecoveryPosition();
@@ -574,6 +592,8 @@ public class MessageDatabase extends Ser
process(message, lastRecoveryPosition);
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
}
+ }finally {
+ this.indexLock.writeLock().unlock();
}
}
@@ -600,7 +620,8 @@ public class MessageDatabase extends Ser
protected void checkpointCleanup(final boolean cleanup) throws IOException {
long start;
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
start = System.currentTimeMillis();
if( !opened.get() ) {
return;
@@ -610,6 +631,8 @@ public class MessageDatabase extends Ser
checkpointUpdate(tx, cleanup);
}
});
+ }finally {
+ this.indexLock.writeLock().unlock();
}
long end = System.currentTimeMillis();
if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
@@ -619,13 +642,16 @@ public class MessageDatabase extends Ser
public void checkpoint(Callback closure) throws Exception {
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, false);
}
});
closure.execute();
+ }finally {
+ this.indexLock.writeLock().unlock();
}
}
@@ -662,8 +688,11 @@ public class MessageDatabase extends Ser
LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
}
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
metadata.lastUpdate = location;
+ }finally {
+ this.indexLock.writeLock().unlock();
}
if (!checkpointThread.isAlive()) {
LOG.info("KahaDB: Recovering checkpoint thread after exception");
@@ -752,12 +781,15 @@ public class MessageDatabase extends Ser
inflightTx.add(new AddOpperation(command, location));
}
} else {
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
upadateIndex(tx, command, location);
}
});
+ }finally {
+ this.indexLock.writeLock().unlock();
}
}
}
@@ -769,34 +801,43 @@ public class MessageDatabase extends Ser
inflightTx.add(new RemoveOpperation(command, location));
}
} else {
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
}
});
+ }finally {
+ this.indexLock.writeLock().unlock();
}
}
}
protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
}
});
+ }finally {
+ this.indexLock.writeLock().unlock();
}
}
protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
}
});
+ }finally {
+ this.indexLock.writeLock().unlock();
}
}
@@ -814,7 +855,8 @@ public class MessageDatabase extends Ser
}
final ArrayList<Operation> messagingTx = inflightTx;
- synchronized (indexMutex) {
+ this.indexLock.writeLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
for (Operation op : messagingTx) {
@@ -822,6 +864,8 @@ public class MessageDatabase extends Ser
}
}
});
+ }finally {
+ this.indexLock.writeLock().unlock();
}
}
@@ -849,7 +893,7 @@ public class MessageDatabase extends Ser
// These methods do the actual index updates.
// /////////////////////////////////////////////////////////////////
- protected final Object indexMutex = new Object();
+ protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {