You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/13 12:54:59 UTC
svn commit: r963674 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb:
KahaDBStore.java KahaDBTransactionStore.java MessageDatabase.java
Author: gtully
Date: Tue Jul 13 10:54:59 2010
New Revision: 963674
URL: http://svn.apache.org/viewvc?rev=963674&view=rev
Log:
further reduction in contention on inflight transacitons in kahaDB store
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=963674&r1=963673&r2=963674&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Tue Jul 13 10:54:59 2010
@@ -956,14 +956,6 @@ public class KahaDBStore extends Message
}
KahaTransactionInfo rc = new KahaTransactionInfo();
- // Link it up to the previous record that was part of the transaction.
- synchronized (inflightTransactions) {
- ArrayList<Operation> tx = inflightTransactions.get(txid);
- if (tx != null) {
- rc.setPreviousEntry(convert(tx.get(tx.size() - 1).location));
- }
- }
-
if (txid.isLocalTransaction()) {
LocalTransactionId t = (LocalTransactionId) txid;
KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=963674&r1=963673&r2=963674&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Tue Jul 13 10:54:59 2010
@@ -289,7 +289,7 @@ public class KahaDBTransactionStore impl
public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
// All the inflight transactions get rolled back..
// inflightTransactions.clear();
- for (Map.Entry<TransactionId, ArrayList<Operation>> entry : theStore.preparedTransactions.entrySet()) {
+ for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
XATransactionId xid = (XATransactionId) entry.getKey();
ArrayList<Message> messageList = new ArrayList<Message>();
ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=963674&r1=963673&r2=963674&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Jul 13 10:54:59 2010
@@ -26,17 +26,7 @@ import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -302,10 +292,7 @@ public class MessageDatabase extends Ser
checkpointThread.setDaemon(true);
checkpointThread.start();
}
-
- /**
- * @throws IOException
- */
+
public void open() throws IOException {
if( opened.compareAndSet(false, true) ) {
getJournal().start();
@@ -731,7 +718,6 @@ public class MessageDatabase extends Ser
* to a JournalMessage which is logged to the journal and then the data from
* the JournalMessage is used to update the index just like it would be done
* during a recovery process.
- * @param done
*/
public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
if (before != null) {
@@ -860,10 +846,8 @@ public class MessageDatabase extends Ser
protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
- synchronized (inflightTransactions) {
- ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
- inflightTx.add(new AddOpperation(command, location));
- }
+ List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
+ inflightTx.add(new AddOpperation(command, location));
} else {
this.indexLock.writeLock().lock();
try {
@@ -880,10 +864,8 @@ public class MessageDatabase extends Ser
protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
- synchronized (inflightTransactions) {
- ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
- inflightTx.add(new RemoveOpperation(command, location));
- }
+ List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
+ inflightTx.add(new RemoveOpperation(command, location));
} else {
this.indexLock.writeLock().lock();
try {
@@ -927,7 +909,7 @@ public class MessageDatabase extends Ser
protected void process(KahaCommitCommand command, Location location) throws IOException {
TransactionId key = key(command.getTransactionInfo());
- ArrayList<Operation> inflightTx = null;
+ List<Operation> inflightTx;
synchronized (inflightTransactions) {
inflightTx = inflightTransactions.remove(key);
if (inflightTx == null) {
@@ -938,7 +920,7 @@ public class MessageDatabase extends Ser
return;
}
- final ArrayList<Operation> messagingTx = inflightTx;
+ final List<Operation> messagingTx = inflightTx;
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@@ -954,9 +936,9 @@ public class MessageDatabase extends Ser
}
protected void process(KahaPrepareCommand command, Location location) {
+ TransactionId key = key(command.getTransactionInfo());
synchronized (inflightTransactions) {
- TransactionId key = key(command.getTransactionInfo());
- ArrayList<Operation> tx = inflightTransactions.remove(key);
+ List<Operation> tx = inflightTransactions.remove(key);
if (tx != null) {
preparedTransactions.put(key, tx);
}
@@ -964,9 +946,9 @@ public class MessageDatabase extends Ser
}
protected void process(KahaRollbackCommand command, Location location) {
+ TransactionId key = key(command.getTransactionInfo());
synchronized (inflightTransactions) {
- TransactionId key = key(command.getTransactionInfo());
- ArrayList<Operation> tx = inflightTransactions.remove(key);
+ List<Operation> tx = inflightTransactions.remove(key);
if (tx == null) {
preparedTransactions.remove(key);
}
@@ -1496,15 +1478,18 @@ public class MessageDatabase extends Ser
// /////////////////////////////////////////////////////////////////
// Transaction related implementation methods.
// /////////////////////////////////////////////////////////////////
- protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
- protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
+ protected final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
+ protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
- private ArrayList<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
+ private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
TransactionId key = key(info);
- ArrayList<Operation> tx = inflightTransactions.get(key);
- if (tx == null) {
- tx = new ArrayList<Operation>();
- inflightTransactions.put(key, tx);
+ List<Operation> tx;
+ synchronized (inflightTransactions) {
+ tx = inflightTransactions.get(key);
+ if (tx == null) {
+ tx = Collections.synchronizedList(new ArrayList<Operation>());
+ inflightTransactions.put(key, tx);
+ }
}
return tx;
}