You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/07/07 21:34:10 UTC

svn commit: r961484 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb: KahaDBStore.java MessageDatabase.java

Author: rajdavies
Date: Wed Jul  7 19:34:09 2010
New Revision: 961484

URL: http://svn.apache.org/viewvc?rev=961484&view=rev
Log:
Improve concurrency

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=961484&r1=961483&r2=961484&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Wed Jul  7 19:34:09 2010
@@ -389,7 +389,8 @@ public class KahaDBStore extends Message
             // operations... but for now we must
             // externally synchronize...
             Location location;
-            synchronized (indexMutex) {
+            indexLock.readLock().lock();
+            try {
                 location = pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
                     public Location execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
@@ -400,6 +401,8 @@ public class KahaDBStore extends Message
                         return sd.orderIndex.get(tx, sequence).location;
                     }
                 });
+            }finally {
+                indexLock.readLock().unlock();
             }
             if (location == null) {
                 return null;
@@ -411,7 +414,8 @@ public class KahaDBStore extends Message
         public int getMessageCount() throws IOException {
             try {
                 lockAsyncJobQueue();
-                synchronized (indexMutex) {
+                indexLock.readLock().lock();
+                try {
                     return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
                         public Integer execute(Transaction tx) throws IOException {
                             // Iterate through all index entries to get a count
@@ -427,6 +431,8 @@ public class KahaDBStore extends Message
                             return rc;
                         }
                     });
+                }finally {
+                    indexLock.readLock().unlock();
                 }
             } finally {
                 unlockAsyncJobQueue();
@@ -435,7 +441,8 @@ public class KahaDBStore extends Message
 
         @Override
         public boolean isEmpty() throws IOException {
-            synchronized (indexMutex) {
+            indexLock.readLock().lock();
+            try {
                 return pageFile.tx().execute(new Transaction.CallableClosure<Boolean, IOException>() {
                     public Boolean execute(Transaction tx) throws IOException {
                         // Iterate through all index entries to get a count of
@@ -444,11 +451,14 @@ public class KahaDBStore extends Message
                         return sd.locationIndex.isEmpty(tx);
                     }
                 });
+            }finally {
+                indexLock.readLock().unlock();
             }
         }
 
         public void recover(final MessageRecoveryListener listener) throws Exception {
-            synchronized (indexMutex) {
+            indexLock.readLock().lock();
+            try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
@@ -459,13 +469,16 @@ public class KahaDBStore extends Message
                         }
                     }
                 });
+            }finally {
+                indexLock.readLock().unlock();
             }
         }
 
         long cursorPos = 0;
 
         public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
-            synchronized (indexMutex) {
+            indexLock.readLock().lock();
+            try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
@@ -486,6 +499,8 @@ public class KahaDBStore extends Message
                         }
                     }
                 });
+            }finally {
+                indexLock.readLock().unlock();
             }
         }
 
@@ -503,13 +518,16 @@ public class KahaDBStore extends Message
                 // operations... but for now we must
                 // externally synchronize...
                 Long location;
-                synchronized (indexMutex) {
+                indexLock.readLock().lock();
+                try {
                     location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
                         public Long execute(Transaction tx) throws IOException {
                             StoredDestination sd = getStoredDestination(dest, tx);
                             return sd.messageIdIndex.get(tx, key);
                         }
                     });
+                }finally {
+                    indexLock.readLock().unlock();
                 }
                 if (location != null) {
                     cursorPos = location + 1;
@@ -638,7 +656,8 @@ public class KahaDBStore extends Message
         public SubscriptionInfo[] getAllSubscriptions() throws IOException {
 
             final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
-            synchronized (indexMutex) {
+            indexLock.readLock().lock();
+            try {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
@@ -652,6 +671,8 @@ public class KahaDBStore extends Message
                         }
                     }
                 });
+            }finally {
+                indexLock.readLock().unlock();
             }
 
             SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()];
@@ -661,7 +682,8 @@ public class KahaDBStore extends Message
 
         public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-            synchronized (indexMutex) {
+            indexLock.readLock().lock();
+            try {
                 return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>() {
                     public SubscriptionInfo execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
@@ -673,13 +695,16 @@ public class KahaDBStore extends Message
                                 .getSubscriptionInfo().newInput()));
                     }
                 });
+            }finally {
+                indexLock.readLock().unlock();
             }
         }
 
         public int getMessageCount(String clientId, String subscriptionName) throws IOException {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
-            synchronized (indexMutex) {
+            indexLock.readLock().lock();
+            try {
                 return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
                     public Integer execute(Transaction tx) throws IOException {
                         StoredDestination sd = getStoredDestination(dest, tx);
@@ -716,13 +741,16 @@ public class KahaDBStore extends Message
                         return counter;
                     }
                 });
+            }finally {
+                indexLock.readLock().unlock();
             }
         }
 
         public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
                 throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-            synchronized (indexMutex) {
+            indexLock.readLock().lock();
+            try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
@@ -736,13 +764,16 @@ public class KahaDBStore extends Message
                         }
                     }
                 });
+            }finally {
+                indexLock.readLock().unlock();
             }
         }
 
         public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned,
                 final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-            synchronized (indexMutex) {
+            indexLock.readLock().lock();
+            try {
                 pageFile.tx().execute(new Transaction.Closure<Exception>() {
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
@@ -768,19 +799,24 @@ public class KahaDBStore extends Message
                         }
                     }
                 });
+            }finally {
+                indexLock.readLock().unlock();
             }
         }
 
         public void resetBatching(String clientId, String subscriptionName) {
             try {
                 final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
-                synchronized (indexMutex) {
+                indexLock.writeLock().lock();
+                try {
                     pageFile.tx().execute(new Transaction.Closure<IOException>() {
                         public void execute(Transaction tx) throws IOException {
                             StoredDestination sd = getStoredDestination(dest, tx);
                             sd.subscriptionCursors.remove(subscriptionKey);
                         }
                     });
+                }finally {
+                    indexLock.writeLock().unlock();
                 }
             } catch (IOException e) {
                 throw new RuntimeException(e);
@@ -827,7 +863,8 @@ public class KahaDBStore extends Message
     public Set<ActiveMQDestination> getDestinations() {
         try {
             final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
-            synchronized (indexMutex) {
+            indexLock.readLock().lock();
+            try {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
@@ -852,6 +889,8 @@ public class KahaDBStore extends Message
                         return isEmptyTopic;
                     }
                 });
+            }finally {
+                indexLock.readLock().unlock();
             }
             return rc;
         } catch (IOException e) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=961484&r1=961483&r2=961484&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Wed Jul  7 19:34:09 2010
@@ -35,6 +35,9 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.command.ConnectionId;
@@ -193,7 +196,8 @@ public class MessageDatabase extends Ser
     }
 
 	private void loadPageFile() throws IOException {
-		synchronized (indexMutex) {
+	    this.indexLock.writeLock().lock();
+	    try {
 		    final PageFile pageFile = getPageFile();
             pageFile.load();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
@@ -232,6 +236,8 @@ public class MessageDatabase extends Ser
                     }
                 }
             });
+        }finally {
+            this.indexLock.writeLock().unlock();
         }
 	}
 	
@@ -307,7 +313,8 @@ public class MessageDatabase extends Ser
 
     public void load() throws IOException {
     	
-        synchronized (indexMutex) {
+        this.indexLock.writeLock().lock();
+        try {
             lock();
             if (deleteAllMessages) {
                 getJournal().start();
@@ -321,7 +328,8 @@ public class MessageDatabase extends Ser
 
 	    	open();
 	        store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
-
+        }finally {
+            this.indexLock.writeLock().unlock();
         }
 
     }
@@ -329,7 +337,8 @@ public class MessageDatabase extends Ser
     
 	public void close() throws IOException, InterruptedException {
 		if( opened.compareAndSet(true, false)) {
-	        synchronized (indexMutex) {
+		    this.indexLock.writeLock().lock();
+	        try {
 	            pageFile.tx().execute(new Transaction.Closure<IOException>() {
 	                public void execute(Transaction tx) throws IOException {
 	                    checkpointUpdate(tx, true);
@@ -337,6 +346,8 @@ public class MessageDatabase extends Ser
 	            });
 	            pageFile.unload();
 	            metadata = new Metadata();
+	        }finally {
+	            this.indexLock.writeLock().unlock();
 	        }
 	        journal.close();
 	        checkpointThread.join();
@@ -346,7 +357,8 @@ public class MessageDatabase extends Ser
 	}
 	
     public void unload() throws IOException, InterruptedException {
-        synchronized (indexMutex) {
+        this.indexLock.writeLock().lock();
+        try {
             if( pageFile != null && pageFile.isLoaded() ) {
                 metadata.state = CLOSED_STATE;
                 metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
@@ -357,6 +369,8 @@ public class MessageDatabase extends Ser
                     }
                 });
             }
+        }finally {
+            this.indexLock.writeLock().unlock();
         }
         close();
     }
@@ -389,7 +403,8 @@ public class MessageDatabase extends Ser
      * @throws IllegalStateException
      */
     private void recover() throws IllegalStateException, IOException {
-        synchronized (indexMutex) {
+        this.indexLock.writeLock().lock();
+        try {
             
 	        long start = System.currentTimeMillis();        
 	        Location recoveryPosition = getRecoveryPosition();
@@ -413,6 +428,8 @@ public class MessageDatabase extends Ser
                     recoverIndex(tx);
                 }
             });
+        }finally {
+            this.indexLock.writeLock().unlock();
         }
     }
     
@@ -559,7 +576,8 @@ public class MessageDatabase extends Ser
 	private Location lastRecoveryPosition;
 
 	public void incrementalRecover() throws IOException {
-        synchronized (indexMutex) {
+	    this.indexLock.writeLock().lock();
+        try {
 	        if( nextRecoveryPosition == null ) {
 	        	if( lastRecoveryPosition==null ) {
 	        		nextRecoveryPosition = getRecoveryPosition();
@@ -574,6 +592,8 @@ public class MessageDatabase extends Ser
 	            process(message, lastRecoveryPosition);            
 	            nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
 	        }
+        }finally {
+            this.indexLock.writeLock().unlock();
         }
 	}
 	
@@ -600,7 +620,8 @@ public class MessageDatabase extends Ser
 
     protected void checkpointCleanup(final boolean cleanup) throws IOException {
     	long start;
-        synchronized (indexMutex) {
+    	this.indexLock.writeLock().lock();
+        try {
             start = System.currentTimeMillis();
         	if( !opened.get() ) {
         		return;
@@ -610,6 +631,8 @@ public class MessageDatabase extends Ser
                     checkpointUpdate(tx, cleanup);
                 }
             });
+        }finally {
+            this.indexLock.writeLock().unlock();
         }
     	long end = System.currentTimeMillis();
     	if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
@@ -619,13 +642,16 @@ public class MessageDatabase extends Ser
 
     
 	public void checkpoint(Callback closure) throws Exception {
-        synchronized (indexMutex) {
+	    this.indexLock.writeLock().lock();
+        try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     checkpointUpdate(tx, false);
                 }
             });
             closure.execute();
+        }finally {
+            this.indexLock.writeLock().unlock();
         }
 	}
 
@@ -662,8 +688,11 @@ public class MessageDatabase extends Ser
         		LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
         	}
     
-            synchronized (indexMutex) {
+        	this.indexLock.writeLock().lock();
+            try {
             	metadata.lastUpdate = location;
+            }finally {
+                this.indexLock.writeLock().unlock();
             }
             if (!checkpointThread.isAlive()) {
                 LOG.info("KahaDB: Recovering checkpoint thread after exception");
@@ -752,12 +781,15 @@ public class MessageDatabase extends Ser
                 inflightTx.add(new AddOpperation(command, location));
             }
         } else {
-            synchronized (indexMutex) {
+            this.indexLock.writeLock().lock();
+            try {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         upadateIndex(tx, command, location);
                     }
                 });
+            }finally {
+                this.indexLock.writeLock().unlock();
             }
         }
     }
@@ -769,34 +801,43 @@ public class MessageDatabase extends Ser
                 inflightTx.add(new RemoveOpperation(command, location));
             }
         } else {
-            synchronized (indexMutex) {
+            this.indexLock.writeLock().lock();
+            try {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         updateIndex(tx, command, location);
                     }
                 });
+            }finally {
+                this.indexLock.writeLock().unlock();
             }
         }
 
     }
 
     protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
-        synchronized (indexMutex) {
+        this.indexLock.writeLock().lock();
+        try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     updateIndex(tx, command, location);
                 }
             });
+        }finally {
+            this.indexLock.writeLock().unlock();
         }
     }
 
     protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
-        synchronized (indexMutex) {
+        this.indexLock.writeLock().lock();
+        try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     updateIndex(tx, command, location);
                 }
             });
+        }finally {
+            this.indexLock.writeLock().unlock();
         }
     }
 
@@ -814,7 +855,8 @@ public class MessageDatabase extends Ser
         }
 
         final ArrayList<Operation> messagingTx = inflightTx;
-        synchronized (indexMutex) {
+        this.indexLock.writeLock().lock();
+        try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     for (Operation op : messagingTx) {
@@ -822,6 +864,8 @@ public class MessageDatabase extends Ser
                     }
                 }
             });
+        }finally {
+            this.indexLock.writeLock().unlock();
         }
     }
 
@@ -849,7 +893,7 @@ public class MessageDatabase extends Ser
     // These methods do the actual index updates.
     // /////////////////////////////////////////////////////////////////
 
-    protected final Object indexMutex = new Object();
+    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
 	private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
 
     void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {