You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/15 14:30:45 UTC
svn commit: r1482789 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/region/
activemq-broker/src/main/java/org/apache/activemq/transaction/
activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/
Author: chirino
Date: Wed May 15 12:30:45 2013
New Revision: 1482789
URL: http://svn.apache.org/r1482789
Log:
Fixes AMQ-4535: Activemq configured with leveldb commit fail when accessed by PutGet from IBM Perf Harness
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1482789&r1=1482788&r2=1482789&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java Wed May 15 12:30:45 2013
@@ -786,33 +786,26 @@ public class Queue extends BaseDestinati
@Override
public void afterCommit() throws Exception {
- LinkedList<Transaction> orderedWork = null;
+ LinkedList<Transaction> orderedWork = new LinkedList<Transaction>();;
// use existing object to sync orderIndexUpdates that can be reassigned
synchronized (sendLock) {
- if (transaction == orderIndexUpdates.peek()) {
- orderedWork = orderIndexUpdates;
- orderIndexUpdates = new LinkedList<Transaction>();
-
- // talking all the ordered work means that earlier
- // and later threads do nothing.
- // this avoids contention/race on the sendLock that
- // guards the actual work.
+ Transaction next = orderIndexUpdates.peek();
+ while( next!=null && next.isCommitted() ) {
+ orderedWork.addLast(orderIndexUpdates.removeFirst());
+ next = orderIndexUpdates.peek();
}
}
// do the ordered work
- if (orderedWork != null) {
+ if (!orderedWork.isEmpty()) {
sendLock.lockInterruptibly();
try {
for (Transaction tx : orderedWork) {
sendSyncs.get(tx).processSend();
+ sendSyncs.remove(tx);
}
} finally {
sendLock.unlock();
}
- for (Transaction tx : orderedWork) {
- sendSyncs.get(tx).processSent();
- }
- sendSyncs.remove(transaction);
}
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=1482789&r1=1482788&r2=1482789&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java Wed May 15 12:30:45 2013
@@ -40,6 +40,7 @@ public abstract class Transaction {
public static final byte IN_USE_STATE = 1; // can go to: 2,3
public static final byte PREPARED_STATE = 2; // can go to: 3
public static final byte FINISHED_STATE = 3;
+ boolean committed = false;
private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
private byte state = START_STATE;
@@ -64,6 +65,14 @@ public abstract class Transaction {
this.state = state;
}
+ public boolean isCommitted() {
+ return committed;
+ }
+
+ public void setCommitted(boolean committed) {
+ this.committed = committed;
+ }
+
public void addSynchronization(Synchronization r) {
synchronizations.add(r);
if (state == START_STATE) {
@@ -182,6 +191,7 @@ public abstract class Transaction {
protected void doPostCommit() throws XAException {
try {
+ setCommitted(true);
fireAfterCommit();
} catch (Throwable e) {
// I guess this could happen. Post commit task failed
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1482789&r1=1482788&r2=1482789&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Wed May 15 12:30:45 2013
@@ -298,11 +298,13 @@ class DelayableUOW(val manager:DBManager
}
def dequeue(expectedQueueKey:Long, id:MessageId) = {
- val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator];
- assert(queueKey == expectedQueueKey)
- val entry = QueueEntryRecord(id, queueKey, queueSeq)
- this.synchronized {
- getAction(id).dequeues += entry
+ if( id.getEntryLocator != null ) {
+ val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator];
+ assert(queueKey == expectedQueueKey)
+ val entry = QueueEntryRecord(id, queueKey, queueSeq)
+ this.synchronized {
+ getAction(id).dequeues += entry
+ }
}
countDownFuture
}