You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/05/19 13:48:38 UTC

svn commit: r946135 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/Queue.java store/AbstractMessageStore.java store/memory/MemoryTransactionStore.java

Author: gtully
Date: Wed May 19 11:48:38 2010
New Revision: 946135

URL: http://svn.apache.org/viewvc?rev=946135&view=rev
Log:
resolve failure of org.apache.activemq.store.jdbc.JDBCStoreOrderTest - async ack when in a transaction and transaction store message store proxy missing async impl, belt and braces, fix async ack and add sync imple to proxy  - following async changes from http://svn.apache.org/viewvc?rev=945102&view=rev

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=946135&r1=946134&r2=946135&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed May 19 11:48:38 2010
@@ -666,7 +666,11 @@ public class Queue extends BaseDestinati
                 ack.setLastMessageId(node.getMessageId());
                 ack.setMessageCount(1);
             }
-            store.removeAsyncMessage(context, ack);
+            if (context.isInTransaction()) {
+                store.removeMessage(context, ack);
+            } else {
+                store.removeAsyncMessage(context, ack);
+            }
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=946135&r1=946134&r2=946135&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java Wed May 19 11:48:38 2010
@@ -28,7 +28,7 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.usage.MemoryUsage;
 
 abstract public class AbstractMessageStore implements MessageStore {
-    static final FutureTask<Object> FUTURE;
+    public static final FutureTask<Object> FUTURE;
     protected final ActiveMQDestination destination;
 
     public AbstractMessageStore(ActiveMQDestination destination) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=946135&r1=946134&r2=946135&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java Wed May 19 11:48:38 2010
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 
 import javax.transaction.xa.XAException;
 
@@ -28,6 +29,7 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.AbstractMessageStore;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.ProxyMessageStore;
@@ -132,9 +134,18 @@ public class MemoryTransactionStore impl
                 MemoryTransactionStore.this.addMessage(getDelegate(), send);
             }
 
+            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+                MemoryTransactionStore.this.addMessage(getDelegate(), message);
+                return AbstractMessageStore.FUTURE;
+             }
+             
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
                 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
             }
+             
+            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);       
+            }
         };
     }
 
@@ -144,9 +155,18 @@ public class MemoryTransactionStore impl
                 MemoryTransactionStore.this.addMessage(getDelegate(), send);
             }
 
+            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+                MemoryTransactionStore.this.addMessage(getDelegate(), message);
+                return AbstractMessageStore.FUTURE;
+             }
+
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
                 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
             }
+            
+            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);       
+            }
         };
     }