You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/15 14:30:45 UTC

svn commit: r1482789 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-broker/src/main/java/org/apache/activemq/transaction/ activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/

Author: chirino
Date: Wed May 15 12:30:45 2013
New Revision: 1482789

URL: http://svn.apache.org/r1482789
Log:
Fixes AMQ-4535: Activemq configured with leveldb commit fail when accessed by PutGet from IBM Perf Harness

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1482789&r1=1482788&r2=1482789&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java Wed May 15 12:30:45 2013
@@ -786,33 +786,26 @@ public class Queue extends BaseDestinati
 
         @Override
         public void afterCommit() throws Exception {
-            LinkedList<Transaction> orderedWork = null;
+            LinkedList<Transaction> orderedWork = new LinkedList<Transaction>();;
             // use existing object to sync orderIndexUpdates that can be reassigned
             synchronized (sendLock) {
-                if (transaction == orderIndexUpdates.peek()) {
-                    orderedWork = orderIndexUpdates;
-                    orderIndexUpdates = new LinkedList<Transaction>();
-
-                    // talking all the ordered work means that earlier
-                    // and later threads do nothing.
-                    // this avoids contention/race on the sendLock that
-                    // guards the actual work.
+                Transaction next = orderIndexUpdates.peek();
+                while( next!=null && next.isCommitted() ) {
+                    orderedWork.addLast(orderIndexUpdates.removeFirst());
+                    next = orderIndexUpdates.peek();
                 }
             }
             // do the ordered work
-            if (orderedWork != null) {
+            if (!orderedWork.isEmpty()) {
                 sendLock.lockInterruptibly();
                 try {
                     for (Transaction tx : orderedWork) {
                         sendSyncs.get(tx).processSend();
+                        sendSyncs.remove(tx);
                     }
                 } finally {
                     sendLock.unlock();
                 }
-                for (Transaction tx : orderedWork) {
-                    sendSyncs.get(tx).processSent();
-                }
-                sendSyncs.remove(transaction);
             }
         }
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=1482789&r1=1482788&r2=1482789&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java Wed May 15 12:30:45 2013
@@ -40,6 +40,7 @@ public abstract class Transaction {
     public static final byte IN_USE_STATE = 1; // can go to: 2,3
     public static final byte PREPARED_STATE = 2; // can go to: 3
     public static final byte FINISHED_STATE = 3;
+    boolean committed = false;
 
     private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
     private byte state = START_STATE;
@@ -64,6 +65,14 @@ public abstract class Transaction {
         this.state = state;
     }
 
+    public boolean isCommitted() {
+        return committed;
+    }
+
+    public void setCommitted(boolean committed) {
+        this.committed = committed;
+    }
+
     public void addSynchronization(Synchronization r) {
         synchronizations.add(r);
         if (state == START_STATE) {
@@ -182,6 +191,7 @@ public abstract class Transaction {
 
     protected void doPostCommit() throws XAException {
         try {
+            setCommitted(true);
             fireAfterCommit();
         } catch (Throwable e) {
             // I guess this could happen. Post commit task failed

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1482789&r1=1482788&r2=1482789&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala Wed May 15 12:30:45 2013
@@ -298,11 +298,13 @@ class DelayableUOW(val manager:DBManager
   }
 
   def dequeue(expectedQueueKey:Long, id:MessageId) = {
-    val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator];
-    assert(queueKey == expectedQueueKey)
-    val entry = QueueEntryRecord(id, queueKey, queueSeq)
-    this.synchronized {
-      getAction(id).dequeues += entry
+    if( id.getEntryLocator != null ) {
+      val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator];
+      assert(queueKey == expectedQueueKey)
+      val entry = QueueEntryRecord(id, queueKey, queueSeq)
+      this.synchronized {
+        getAction(id).dequeues += entry
+      }
     }
     countDownFuture
   }