You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/13 12:54:59 UTC

svn commit: r963674 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb: KahaDBStore.java KahaDBTransactionStore.java MessageDatabase.java

Author: gtully
Date: Tue Jul 13 10:54:59 2010
New Revision: 963674

URL: http://svn.apache.org/viewvc?rev=963674&view=rev
Log:
further reduction in contention on inflight transacitons in kahaDB store

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=963674&r1=963673&r2=963674&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Tue Jul 13 10:54:59 2010
@@ -956,14 +956,6 @@ public class KahaDBStore extends Message
         }
         KahaTransactionInfo rc = new KahaTransactionInfo();
 
-        // Link it up to the previous record that was part of the transaction.
-        synchronized (inflightTransactions) {
-            ArrayList<Operation> tx = inflightTransactions.get(txid);
-            if (tx != null) {
-                rc.setPreviousEntry(convert(tx.get(tx.size() - 1).location));
-            }
-        }
-
         if (txid.isLocalTransaction()) {
             LocalTransactionId t = (LocalTransactionId) txid;
             KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=963674&r1=963673&r2=963674&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Tue Jul 13 10:54:59 2010
@@ -289,7 +289,7 @@ public class KahaDBTransactionStore impl
     public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
         // All the inflight transactions get rolled back..
         // inflightTransactions.clear();
-        for (Map.Entry<TransactionId, ArrayList<Operation>> entry : theStore.preparedTransactions.entrySet()) {
+        for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
             XATransactionId xid = (XATransactionId) entry.getKey();
             ArrayList<Message> messageList = new ArrayList<Message>();
             ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=963674&r1=963673&r2=963674&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Jul 13 10:54:59 2010
@@ -26,17 +26,7 @@ import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -302,10 +292,7 @@ public class MessageDatabase extends Ser
         checkpointThread.setDaemon(true);
         checkpointThread.start();
 	}
-	
-	/**
-	 * @throws IOException
-	 */
+
 	public void open() throws IOException {
 		if( opened.compareAndSet(false, true) ) {
             getJournal().start();
@@ -731,7 +718,6 @@ public class MessageDatabase extends Ser
      * to a JournalMessage which is logged to the journal and then the data from
      * the JournalMessage is used to update the index just like it would be done
      * during a recovery process.
-     * @param done 
      */
     public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
     	if (before != null) {
@@ -860,10 +846,8 @@ public class MessageDatabase extends Ser
 
     protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
         if (command.hasTransactionInfo()) {
-            synchronized (inflightTransactions) {
-                ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
-                inflightTx.add(new AddOpperation(command, location));
-            }
+            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
+            inflightTx.add(new AddOpperation(command, location));
         } else {
             this.indexLock.writeLock().lock();
             try {
@@ -880,10 +864,8 @@ public class MessageDatabase extends Ser
 
     protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
         if (command.hasTransactionInfo()) {
-            synchronized (inflightTransactions) {
-                ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
-                inflightTx.add(new RemoveOpperation(command, location));
-            }
+           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
+           inflightTx.add(new RemoveOpperation(command, location));
         } else {
             this.indexLock.writeLock().lock();
             try {
@@ -927,7 +909,7 @@ public class MessageDatabase extends Ser
 
     protected void process(KahaCommitCommand command, Location location) throws IOException {
         TransactionId key = key(command.getTransactionInfo());
-        ArrayList<Operation> inflightTx = null;
+        List<Operation> inflightTx;
         synchronized (inflightTransactions) {
             inflightTx = inflightTransactions.remove(key);
             if (inflightTx == null) {
@@ -938,7 +920,7 @@ public class MessageDatabase extends Ser
             return;
         }
 
-        final ArrayList<Operation> messagingTx = inflightTx;
+        final List<Operation> messagingTx = inflightTx;
         this.indexLock.writeLock().lock();
         try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
@@ -954,9 +936,9 @@ public class MessageDatabase extends Ser
     }
 
     protected void process(KahaPrepareCommand command, Location location) {
+        TransactionId key = key(command.getTransactionInfo());
         synchronized (inflightTransactions) {
-            TransactionId key = key(command.getTransactionInfo());
-            ArrayList<Operation> tx = inflightTransactions.remove(key);
+            List<Operation> tx = inflightTransactions.remove(key);
             if (tx != null) {
                 preparedTransactions.put(key, tx);
             }
@@ -964,9 +946,9 @@ public class MessageDatabase extends Ser
     }
 
     protected void process(KahaRollbackCommand command, Location location) {
+        TransactionId key = key(command.getTransactionInfo());
         synchronized (inflightTransactions) {
-            TransactionId key = key(command.getTransactionInfo());
-            ArrayList<Operation> tx = inflightTransactions.remove(key);
+            List<Operation> tx = inflightTransactions.remove(key);
             if (tx == null) {
                 preparedTransactions.remove(key);
             }
@@ -1496,15 +1478,18 @@ public class MessageDatabase extends Ser
     // /////////////////////////////////////////////////////////////////
     // Transaction related implementation methods.
     // /////////////////////////////////////////////////////////////////
-    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
-    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
+    protected final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
+    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
  
-    private ArrayList<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
+    private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
         TransactionId key = key(info);
-        ArrayList<Operation> tx = inflightTransactions.get(key);
-        if (tx == null) {
-            tx = new ArrayList<Operation>();
-            inflightTransactions.put(key, tx);
+        List<Operation> tx;
+        synchronized (inflightTransactions) {
+            tx = inflightTransactions.get(key);
+            if (tx == null) {
+                tx = Collections.synchronizedList(new ArrayList<Operation>());
+                inflightTransactions.put(key, tx);
+            }
         }
         return tx;
     }