You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 16:42:53 UTC

svn commit: r1295627 [8/12] - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/ bdbstore/src/main/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/ bdbsto...

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java Thu Mar  1 15:42:44 2012
@@ -23,6 +23,7 @@ package org.apache.qpid.server.logging.a
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.NullRootMessageLogger;
 
 /**
@@ -49,10 +50,7 @@ import org.apache.qpid.server.logging.Nu
 public class CurrentActorTest extends BaseConnectionActorTestCase
 {
     //Set this to be a reasonably large number
-    int THREADS = 10;
-
-    // Record any exceptions that are thrown by the threads
-    Exception[] _errors = new Exception[THREADS];
+    private static final int THREADS = 10;
 
     /**
      * Test that CurrentActor behaves as LIFO queue.
@@ -161,19 +159,11 @@ public class CurrentActorTest extends Ba
     public void testThreadLocal()
     {
 
-        new Runnable(){
-            public void run()
-            {
-                System.out.println(_errors[0]);
-            }
-        };
-
         // Setup the threads
-        Thread[] threads = new Thread[THREADS];
+        LogMessagesWithAConnectionActor[] threads = new LogMessagesWithAConnectionActor[THREADS];
         for (int count = 0; count < THREADS; count++)
         {
-            Runnable test = new LogMessagesWithAConnectionActor(count);
-            threads[count] = new Thread(test);
+            threads[count] = new LogMessagesWithAConnectionActor();
         }
 
         //Run the threads
@@ -198,10 +188,10 @@ public class CurrentActorTest extends Ba
         // Verify that none of the tests threw an exception
         for (int count = 0; count < THREADS; count++)
         {
-            if (_errors[count] != null)
+            if (threads[count].getException() != null)
             {
-                _errors[count].printStackTrace();
-                fail("Error occured in thread:" + count);
+                threads[count].getException().printStackTrace();
+                fail("Error occured in thread:" + count + "("+threads[count].getException()+")");
             }
         }
     }
@@ -210,13 +200,12 @@ public class CurrentActorTest extends Ba
      * Creates a new ConnectionActor and logs the given number of messages
      * before removing the actor and validating that there is no set actor.
      */
-    public class LogMessagesWithAConnectionActor implements Runnable
+    public class LogMessagesWithAConnectionActor extends Thread
     {
-        int count;
+        Throwable _exception;
 
-        LogMessagesWithAConnectionActor(int count)
+        public LogMessagesWithAConnectionActor()
         {
-            this.count = count;
         }
 
         public void run()
@@ -227,6 +216,7 @@ public class CurrentActorTest extends Ba
             //fixme reminder that we need a better approach for broker testing.
             try
             {
+                LogActor defaultActor = CurrentActor.get();
 
                 AMQPConnectionActor actor = new AMQPConnectionActor(getSession(),
                                                                     new NullRootMessageLogger());
@@ -237,20 +227,26 @@ public class CurrentActorTest extends Ba
                 sendTestLogMessage(CurrentActor.get());
 
                 // Verify it was the same actor as we set earlier
-                assertEquals("Retrieved actor is not as expected ",
-                             actor, CurrentActor.get());
+                if(!actor.equals(CurrentActor.get()))
+                   throw new IllegalArgumentException("Retrieved actor is not as expected ");
 
                 // Verify that removing the actor works for this thread
                 CurrentActor.remove();
 
-                assertNull("CurrentActor should be null", CurrentActor.get());
+                if(CurrentActor.get() != defaultActor)
+                   throw new IllegalArgumentException("CurrentActor ("+CurrentActor.get()+") should be default actor" + defaultActor);
             }
-            catch (Exception e)
+            catch (Throwable e)
             {
-                _errors[count] = e;
+                _exception = e;
             }
 
         }
+
+        public Throwable getException()
+        {
+            return _exception;
+        }
     }
 
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Thu Mar  1 15:42:44 2012
@@ -64,17 +64,17 @@ public class AMQPriorityQueueTest extend
         ArrayList<QueueEntry> msgs = _subscription.getMessages();
         try
         {
-            assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageNumber());
-            assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageNumber());
-            assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageNumber());
-
-            assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageNumber());
-            assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageNumber());
-            assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageNumber());
-
-            assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageNumber());
-            assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageNumber());
-            assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageNumber());
+            assertEquals(1L, msgs.get(0).getMessage().getMessageNumber());
+            assertEquals(6L, msgs.get(1).getMessage().getMessageNumber());
+            assertEquals(8L, msgs.get(2).getMessage().getMessageNumber());
+
+            assertEquals(2L, msgs.get(3).getMessage().getMessageNumber());
+            assertEquals(5L, msgs.get(4).getMessage().getMessageNumber());
+            assertEquals(7L, msgs.get(5).getMessage().getMessageNumber());
+
+            assertEquals(3L, msgs.get(6).getMessage().getMessageNumber());
+            assertEquals(4L, msgs.get(7).getMessage().getMessageNumber());
+            assertEquals(9L, msgs.get(8).getMessage().getMessageNumber());
         }
         catch (AssertionFailedError afe)
         {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Thu Mar  1 15:42:44 2012
@@ -30,7 +30,6 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -168,22 +167,22 @@ public class MockAMQQueue implements AMQ
 
     public UUID getId()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public QueueConfigType getConfigType()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public ConfiguredObject getParent()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public boolean isDurable()
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        return false;
     }
 
     public boolean isAutoDelete()
@@ -199,7 +198,7 @@ public class MockAMQQueue implements AMQ
 
     public AMQShortString getOwner()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public void setVirtualHost(VirtualHost virtualhost)
@@ -219,22 +218,22 @@ public class MockAMQQueue implements AMQ
 
     public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public void unregisterSubscription(Subscription subscription) throws AMQException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public int getConsumerCount()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public int getActiveConsumerCount()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public boolean hasExclusiveSubscriber()
@@ -244,37 +243,37 @@ public class MockAMQQueue implements AMQ
 
     public boolean isUnused()
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        return false;
     }
 
     public boolean isEmpty()
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        return false;
     }
 
     public int getMessageCount()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public int getUndeliveredMessageCount()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public long getQueueDepth()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public long getReceivedMessageCount()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public long getOldestMessageArrivalTime()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public boolean isDeleted()
@@ -297,59 +296,58 @@ public class MockAMQQueue implements AMQ
     }
 
 
+    public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException
+    {
+    }
+
     public void requeue(QueueEntry entry)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public void requeue(QueueEntryImpl storeContext, Subscription subscription)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public void dequeue(QueueEntry entry, Subscription sub)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        return false;
     }
 
     public void addQueueDeleteTask(Task task)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public void removeQueueDeleteTask(final Task task)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public List<QueueEntry> getMessagesOnTheQueue()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public List<Long> getMessagesOnTheQueue(int num)
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public List<Long> getMessagesOnTheQueue(int num, int offest)
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public QueueEntry getMessageOnTheQueue(long messageId)
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
@@ -359,146 +357,137 @@ public class MockAMQQueue implements AMQ
 
     public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public long getMaximumMessageSize()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public void setMaximumMessageSize(long value)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public long getMaximumMessageCount()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public void setMaximumMessageCount(long value)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public long getMaximumQueueDepth()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public void setMaximumQueueDepth(long value)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public long getMaximumMessageAge()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public void setMaximumMessageAge(long maximumMessageAge)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public boolean getBlockOnQueueFull()
-    {
-        return false;
-    }
-
-    public void setBlockOnQueueFull(boolean block)
-    {
+      
     }
 
     public long getMinimumAlertRepeatGap()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public void deleteMessageFromTop()
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public long clearQueue()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
 
     public void checkMessageStatus() throws AMQException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public Set<NotificationCheck> getNotificationChecks()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public void flushSubscription(Subscription sub) throws AMQException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public void deliverAsync(Subscription sub)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public void deliverAsync()
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public void stop()
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public boolean isExclusive()
     {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
+        return false;
     }
 
     public Exchange getAlternateExchange()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public void setAlternateExchange(Exchange exchange)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public Map<String, Object> getArguments()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
-    public void checkCapacity(AMQChannel channel)
+    public void checkCapacity(AMQSessionModel channel)
     {
     }
 
     public ManagedObject getManagedObject()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public int compareTo(AMQQueue o)
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public void setMinimumAlertRepeatGap(long value)
@@ -508,22 +497,22 @@ public class MockAMQQueue implements AMQ
 
     public long getCapacity()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public void setCapacity(long capacity)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public long getFlowResumeCapacity()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return 0;
     }
 
     public void setFlowResumeCapacity(long flowResumeCapacity)
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+      
     }
 
     public void configure(ConfigurationPlugin config)
@@ -533,7 +522,7 @@ public class MockAMQQueue implements AMQ
 
     public ConfigurationPlugin getConfiguration()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public AuthorizationHolder getAuthorizationHolder()
@@ -612,20 +601,20 @@ public class MockAMQQueue implements AMQ
 
     }
 
-    @Override
     public int getMaximumDeliveryCount()
     {
         return 0;
     }
 
-    @Override
     public void setMaximumDeliveryCount(int maximumDeliveryCount)
     {
     }
 
-    @Override
     public void setAlternateExchange(String exchangeName)
     {
     }
 
+    public void visit(final Visitor visitor)
+    {
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java Thu Mar  1 15:42:44 2012
@@ -23,7 +23,6 @@ package org.apache.qpid.server.queue;
 import org.apache.qpid.framing.FieldTable;
 
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -107,7 +106,17 @@ public class MockStoredMessage implement
         return src.limit();
     }
 
-    public TransactionLog.StoreFuture flushToStore()
+
+
+    public ByteBuffer getContent(int offsetInMessage, int size)
+    {
+        ByteBuffer buf = ByteBuffer.allocate(size);
+        getContent(offsetInMessage, buf);
+        buf.position(0);
+        return  buf;
+    }
+
+    public MessageStore.StoreFuture flushToStore()
     {
         return MessageStore.IMMEDIATE_FUTURE;
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java Thu Mar  1 15:42:44 2012
@@ -164,7 +164,7 @@ public abstract class QueueEntryListTest
         final QueueEntry head = getTestList().getHead();
         assertNull("Head entry should not contain an actual message", head.getMessage());
         assertEquals("Unexpected message id for first list entry", getExpectedFirstMsgId(), getTestList().next(head)
-                        .getMessage().getMessageNumber().longValue());
+                        .getMessage().getMessageNumber());
     }
 
     /**

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Thu Mar  1 15:42:44 2012
@@ -649,9 +649,7 @@ public class SimpleAMQQueueTest extends 
                                         public void onRollback()
                                         {
                                         }
-                                    });
-
-
+                                    }, 0L);
 
         // Check that it is enqueued
         AMQQueue data = _store.getMessages().get(1L);

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java Thu Mar  1 15:42:44 2012
@@ -162,8 +162,8 @@ public class SimpleQueueEntryListTest ex
         while (entry != null)
         {           
             assertFalse("Entry " + entry.getMessage().getMessageNumber() + " should not have been deleted", entry.isDeleted());
-            assertNotNull("QueueEntry was not found in the list of remaining entries", 
-                    remainingMessages.get(entry.getMessage().getMessageNumber().intValue()));
+            assertNotNull("QueueEntry "+entry.getMessage().getMessageNumber()+" was not found in the list of remaining entries " + remainingMessages,
+                    remainingMessages.get((int)(entry.getMessage().getMessageNumber())));
 
             count++;
             entry = entry.getNextNode();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Thu Mar  1 15:42:44 2012
@@ -317,7 +317,7 @@ public class SortedQueueEntryListTest ex
         assertEquals("Sorted queue entry value is not as expected",
                         expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY"));
         assertEquals("Sorted queue entry id is not as expected",
-                        Long.valueOf(expectedMessageId), entry.getMessage().getMessageNumber());
+                        expectedMessageId, entry.getMessage().getMessageNumber());
     }
 
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Thu Mar  1 15:42:44 2012
@@ -558,7 +558,7 @@ public class MessageStoreTest extends In
     /**
      * Delete the Store Environment path
      *
-     * @param configuration The configuration that contains the store environment path.
+     * @param environmentPath The configuration that contains the store environment path.
      */
     private void cleanup(File environmentPath)
     {
@@ -636,7 +636,7 @@ public class MessageStoreTest extends In
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
-            });
+            }, 0L);
         }
     }
 
@@ -710,7 +710,7 @@ public class MessageStoreTest extends In
 
             if (queue.isDurable() && !queue.isAutoDelete())
             {
-                getVirtualHost().getMessageStore().createQueue(queue, queueArguments);
+                getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments);
             }
         }
         catch (AMQException e)
@@ -754,7 +754,7 @@ public class MessageStoreTest extends In
             getVirtualHost().getExchangeRegistry().registerExchange(exchange);
             if (durable)
             {
-                getVirtualHost().getMessageStore().createExchange(exchange);
+                getVirtualHost().getDurableConfigurationStore().createExchange(exchange);
             }
         }
         catch (AMQException e)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java Thu Mar  1 15:42:44 2012
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQStoreException
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.exchange.Exchange;
@@ -42,18 +43,11 @@ import java.nio.ByteBuffer;
  */
 public class SkeletonMessageStore implements MessageStore
 {
-    private final AtomicLong _messageId = new AtomicLong(1);
-
-    public void configure(String base, Configuration config) throws Exception
-    {
-    }
-
     public void configureConfigStore(String name,
                           ConfigurationRecoveryHandler recoveryHandler,
                           Configuration config,
                           LogSubject logSubject) throws Exception
     {
-        //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public void configureMessageStore(String name,
@@ -61,7 +55,6 @@ public class SkeletonMessageStore implem
                                       Configuration config,
                                       LogSubject logSubject) throws Exception
     {
-        //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public void close() throws Exception
@@ -70,31 +63,28 @@ public class SkeletonMessageStore implem
 
     public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
-    public void removeMessage(Long messageId)
-    {
-    }
 
     public void createExchange(Exchange exchange) throws AMQStoreException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+
     }
 
     public void removeExchange(Exchange exchange) throws AMQStoreException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+
     }
 
     public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+
     }
 
     public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+
     }
 
     public void createQueue(AMQQueue queue) throws AMQStoreException
@@ -105,63 +95,11 @@ public class SkeletonMessageStore implem
     {
     }
 
-
-
-
-    public List<AMQQueue> createQueues() throws AMQException
-    {
-        return null;
-    }
-
-    public Long getNewMessageId()
-    {
-        return _messageId.getAndIncrement();
-    }
-
-    public void storeContentBodyChunk(
-            Long messageId,
-            int index,
-            ContentChunk contentBody,
-            boolean lastContentBody) throws AMQException
-    {
-
-    }
-
-    public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException
-    {
-
-    }
-
-    public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
-    {
-        return null;
-    }
-
-    public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException
-    {
-        return null;
-    }
-
     public boolean isPersistent()
     {
         return false;
     }
 
-    public void storeMessageHeader(Long messageNumber, ServerMessage message)
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public void storeContent(Long messageNumber, long offset, ByteBuffer body)
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public ServerMessage getMessage(Long messageNumber)
-    {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
     public void removeQueue(final AMQQueue queue) throws AMQStoreException
     {
 
@@ -172,7 +110,7 @@ public class SkeletonMessageStore implem
                                         Configuration storeConfiguration,
                                         LogSubject logSubject) throws Exception
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+
     }
 
     public Transaction newTransaction()
@@ -180,19 +118,19 @@ public class SkeletonMessageStore implem
         return new Transaction()
         {
 
-            public void enqueueMessage(TransactionLogResource  queue, Long messageId) throws AMQStoreException
+            public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
             {
-                //To change body of implemented methods use File | Settings | File Templates.
+
             }
 
-            public void dequeueMessage(TransactionLogResource  queue, Long messageId) throws AMQStoreException
+            public void dequeueMessage(TransactionLogResource  queue, EnqueableMessage message) throws AMQStoreException
             {
-                //To change body of implemented methods use File | Settings | File Templates.
+
             }
 
             public void commitTran() throws AMQStoreException
             {
-                //To change body of implemented methods use File | Settings | File Templates.
+
             }
 
             public StoreFuture commitTranAsync() throws AMQStoreException
@@ -213,7 +151,7 @@ public class SkeletonMessageStore implem
 
             public void abortTran() throws AMQStoreException
             {
-                //To change body of implemented methods use File | Settings | File Templates.
+
             }
         };
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java Thu Mar  1 15:42:44 2012
@@ -82,6 +82,12 @@ public class TestMemoryMessageStore exte
             return _storedMessage.getContent(offsetInMessage, dst);
         }
 
+
+        public ByteBuffer getContent(int offsetInMessage, int size)
+        {
+            return _storedMessage.getContent(offsetInMessage, size);
+        }
+
         public StoreFuture flushToStore()
         {
             return _storedMessage.flushToStore();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Thu Mar  1 15:42:44 2012
@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 
 /**
@@ -66,14 +68,14 @@ public class TestableMemoryMessageStore 
 
     private class TestableTransaction implements Transaction
     {
-        public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+        public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
         {
-            getMessages().put(messageId, (AMQQueue)queue);
+            getMessages().put(message.getMessageNumber(), (AMQQueue)queue);
         }
 
-        public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+        public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
         {
-            getMessages().remove(messageId);
+            getMessages().remove(message.getMessageNumber());
         }
 
         public void commitTran() throws AMQStoreException
@@ -143,6 +145,12 @@ public class TestableMemoryMessageStore 
             return _storedMessage.getContent(offsetInMessage, dst);
         }
 
+
+        public ByteBuffer getContent(int offsetInMessage, int size)
+        {
+            return _storedMessage.getContent(offsetInMessage, size);
+        }
+
         public StoreFuture flushToStore()
         {
             return _storedMessage.flushToStore();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Thu Mar  1 15:42:44 2012
@@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.queue.MockAMQQueue;
 import org.apache.qpid.server.queue.MockQueueEntry;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -44,7 +44,7 @@ public class AutoCommitTransactionTest e
 {
     private ServerTransaction _transaction = null;  // Class under test
     
-    private TransactionLog _transactionLog;
+    private MessageStore _transactionLog;
     private AMQQueue _queue;
     private List<AMQQueue> _queues;
     private Collection<QueueEntry> _queueEntries;
@@ -137,7 +137,7 @@ public class AutoCommitTransactionTest e
         _message = createTestMessage(false);
         _queues = createTestBaseQueues(new boolean[] {false, false, false});
         
-        _transaction.enqueue(_queues, _message, _action);
+        _transaction.enqueue(_queues, _message, _action, 0L);
 
         assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -157,7 +157,7 @@ public class AutoCommitTransactionTest e
         _message = createTestMessage(true);
         _queues = createTestBaseQueues(new boolean[] {false, false, false});
         
-        _transaction.enqueue(_queues, _message, _action);
+        _transaction.enqueue(_queues, _message, _action, 0L);
 
         assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -175,7 +175,7 @@ public class AutoCommitTransactionTest e
         _message = createTestMessage(true);
         _queues = createTestBaseQueues(new boolean[] {false, true, false, true});
         
-        _transaction.enqueue(_queues, _message, _action);
+        _transaction.enqueue(_queues, _message, _action, 0L);
 
         assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState());
@@ -198,7 +198,7 @@ public class AutoCommitTransactionTest e
         
         try
         {
-            _transaction.enqueue(_queues, _message, _action);
+            _transaction.enqueue(_queues, _message, _action, 0L);
             fail("Exception not thrown");
         }
         catch (RuntimeException re)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Thu Mar  1 15:42:44 2012
@@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.queue.MockAMQQueue;
 import org.apache.qpid.server.queue.MockQueueEntry;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -51,7 +51,7 @@ public class LocalTransactionTest extend
     private MockAction _action1;
     private MockAction _action2;
     private MockStoreTransaction _storeTransaction;
-    private TransactionLog _transactionLog;
+    private MessageStore _transactionLog;
 
 
     @Override
@@ -140,7 +140,7 @@ public class LocalTransactionTest extend
         _message = createTestMessage(false);
         _queues = createTestBaseQueues(new boolean[] {false, false, false});
         
-        _transaction.enqueue(_queues, _message, _action1);
+        _transaction.enqueue(_queues, _message, _action1, 0L);
 
         assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -156,7 +156,7 @@ public class LocalTransactionTest extend
         _message = createTestMessage(true);
         _queues = createTestBaseQueues(new boolean[] {false, false, false});
         
-        _transaction.enqueue(_queues, _message, _action1);
+        _transaction.enqueue(_queues, _message, _action1, 0L);
   
         assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -173,7 +173,7 @@ public class LocalTransactionTest extend
         _message = createTestMessage(true);
         _queues = createTestBaseQueues(new boolean[] {false, true, false, true});
         
-        _transaction.enqueue(_queues, _message, _action1);
+        _transaction.enqueue(_queues, _message, _action1, 0L);
 
         assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState());
@@ -196,7 +196,7 @@ public class LocalTransactionTest extend
         
         try
         {
-            _transaction.enqueue(_queues, _message, _action1);
+            _transaction.enqueue(_queues, _message, _action1, 0L);
             fail("Exception not thrown");
         }
         catch (RuntimeException re)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Thu Mar  1 15:42:44 2012
@@ -27,11 +27,12 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.StoredMessage;
 
 /**
  * Mock Server Message allowing its persistent flag to be controlled from test.
  */
-class MockServerMessage implements ServerMessage<MockServerMessage>
+class MockServerMessage implements ServerMessage
 {
     /**
      *
@@ -83,6 +84,11 @@ class MockServerMessage implements Serve
         throw new NotImplementedException();
     }
 
+    public StoredMessage getStoredMessage()
+    {
+        throw new NotImplementedException();
+    }
+
     public long getExpiration()
     {
         throw new NotImplementedException();
@@ -93,12 +99,18 @@ class MockServerMessage implements Serve
         throw new NotImplementedException();
     }
 
+
+    public ByteBuffer getContent(int offset, int size)
+    {
+        throw new NotImplementedException();
+    }
+
     public long getArrivalTime()
     {
         throw new NotImplementedException();
     }
 
-    public Long getMessageNumber()
+    public long getMessageNumber()
     {
         return 0L;
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java Thu Mar  1 15:42:44 2012
@@ -24,11 +24,11 @@ import org.apache.commons.configuration.
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.TransactionLog.StoreFuture;
-import org.apache.qpid.server.store.TransactionLog.Transaction;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.*;
+import org.apache.qpid.server.store.MessageStore.StoreFuture;
+import org.apache.qpid.server.store.MessageStore.Transaction;
 
 /**
  * Mock implementation of a (Store) Transaction allow its state to be observed.
@@ -61,7 +61,7 @@ class MockStoreTransaction implements Tr
         return _state;
     }
 
-    public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+    public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
     {
         if (_throwExceptionOnQueueOp)
         {
@@ -82,7 +82,7 @@ class MockStoreTransaction implements Tr
         return _numberOfEnqueuedMessages;
     }
 
-    public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+    public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
     {
         if (_throwExceptionOnQueueOp)
         {
@@ -107,10 +107,33 @@ class MockStoreTransaction implements Tr
         _state = TransactionState.ABORTED;
     }
 
-    public static TransactionLog createTestTransactionLog(final MockStoreTransaction storeTransaction)
+    public static MessageStore createTestTransactionLog(final MockStoreTransaction storeTransaction)
     {
-        return new TransactionLog()
+        return new MessageStore()
         {
+            public void configureMessageStore(final String name,
+                                              final MessageStoreRecoveryHandler recoveryHandler,
+                                              final Configuration config,
+                                              final LogSubject logSubject) throws Exception
+            {
+                //TODO.
+            }
+
+            public void close() throws Exception
+            {
+                //TODO.
+            }
+
+            public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
+            {
+                return null;  //TODO.
+            }
+
+            public boolean isPersistent()
+            {
+                return false;  //TODO.
+            }
+
             public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
                     Configuration storeConfiguration, LogSubject logSubject) throws Exception
             {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java Thu Mar  1 15:42:44 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.qpid.server.binding.BindingFactory;
@@ -41,7 +42,6 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
 import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
 
 
@@ -66,6 +66,11 @@ public class MockVirtualHost implements 
 
     }
 
+    public BrokerLink createBrokerConnection(final UUID id, final long createTime, final Map<String, String> arguments)
+    {
+        return null;
+    }
+
     public IApplicationRegistry getApplicationRegistry()
     {
         return null;
@@ -161,10 +166,6 @@ public class MockVirtualHost implements 
         return null;
     }
 
-    public TransactionLog getTransactionLog()
-    {
-        return null;
-    }
 
     public void removeBrokerConnection(BrokerLink brokerLink)
     {

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/velocity/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Mar  1 15:42:44 2012
@@ -0,0 +1,2 @@
+*.iml
+

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/xsl/qmf.xsl
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/xsl/qmf.xsl?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/xsl/qmf.xsl (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/xsl/qmf.xsl Thu Mar  1 15:42:44 2012
@@ -288,6 +288,17 @@ public class <xsl:value-of select="$Clas
             <xsl:apply-templates select="node()[name()='property' or name()='statistic']" mode="optionalPropertyPresence"/>
             <xsl:apply-templates select="node()[name()='property' or name()='statistic']" mode="encodeProperty"/>
         }
+
+        public String toString()
+        {
+            return "QMF<xsl:value-of select="@name"/>GetQueryResponseCommand{id=" + getObject().getId()   
+<xsl:for-each select="node()[name()='property' or name()='statistic']">
+<xsl:if test="@type!='hilo32' and @type!='mmaTime' ">
+                + ", <xsl:value-of select="@name"/>=" + getObject().get<xsl:call-template name="initCap"><xsl:with-param name="input"><xsl:value-of select="@name"/></xsl:with-param></xsl:call-template>()
+</xsl:if>
+</xsl:for-each>
+                   + "}";
+        }
     }
     
     
@@ -530,6 +541,11 @@ public class <xsl:value-of select="$Clas
             {
                 return obj.<xsl:value-of select="@name"/>( new <xsl:value-of select="$ClassName"/>ResponseCommandFactory(cmd)<xsl:if test="node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]">, </xsl:if><xsl:apply-templates select="node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]" mode="methodArgList"><xsl:with-param name="prefix">_</xsl:with-param></xsl:apply-templates> );
             }
+
+            public String toString()
+            {
+                return "<xsl:value-of select="$ClassName"/>["<xsl:for-each select="node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]"><xsl:if test="preceding-sibling::node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]">+ ", "</xsl:if>+ "<xsl:value-of select="@name"/> = " + _<xsl:value-of select="@name"/> </xsl:for-each>+"]";
+            }
         }
         
         public final class <xsl:value-of select="$ClassName"/>ResponseCommandFactory

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps Thu Mar  1 15:42:44 2012
@@ -146,7 +146,7 @@ management-common.test.libs=${test.libs}
 ra.libs=${geronimo-j2ee} ${geronimo-jta} ${geronimo-jms} ${slf4j-api} ${geronimo-kernel} ${geronimo-openejb}
 
 # optional bdbstore module deps
-bdb-je=lib/bdbstore/je-4.0.117.jar
+bdb-je=lib/bdbstore/je-5.0.34.jar
 bdbstore.libs=${bdb-je}
 bdbstore.test.libs=${test.libs}
 

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/example/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Mar  1 15:42:44 2012
@@ -0,0 +1,2 @@
+*.iml
+

Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Mar  1 15:42:44 2012
@@ -0,0 +1,2 @@
+*.iml
+

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Thu Mar  1 15:42:44 2012
@@ -308,7 +308,7 @@ public class AMQConnectionDelegate_8_0 i
         {
             AMQSession s = (AMQSession) it.next();
             // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
-            reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
+            reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted());
             s.resubscribe();
         }
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Mar  1 15:42:44 2012
@@ -829,7 +829,7 @@ public abstract class AMQDestination imp
         dest.setSubject(_subject);
         dest.setCreate(_create); 
         dest.setAssert(_assert); 
-        dest.setDelete(_create); 
+        dest.setDelete(_delete); 
         dest.setBrowseOnly(_browseOnly);
         dest.setAddressType(_addressType);
         dest.setTargetNode(_targetNode);

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Mar  1 15:42:44 2012
@@ -256,7 +256,7 @@ public abstract class AMQSession<C exten
     protected AMQConnection _connection;
 
     /** Used to indicate whether or not this is a transactional session. */
-    protected boolean _transacted;
+    protected final boolean _transacted;
 
     /** Holds the sessions acknowledgement mode. */
     protected final int _acknowledgeMode;
@@ -371,7 +371,7 @@ public abstract class AMQSession<C exten
      * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
      * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
      */
-    private volatile boolean _usingDispatcherForCleanup;
+    protected volatile boolean _usingDispatcherForCleanup;
 
     /** Used to indicates that the connection to which this session belongs, has been stopped. */
     private boolean _connectionStopped;
@@ -1583,6 +1583,11 @@ public abstract class AMQSession<C exten
         return _prefetchLowMark;
     }
 
+    public int getPrefetch()
+    {
+        return _prefetchHighMark;
+    }
+
     public AMQShortString getDefaultQueueExchangeName()
     {
         return _connection.getDefaultQueueExchangeName();
@@ -1614,7 +1619,24 @@ public abstract class AMQSession<C exten
         return _ticket;
     }
 
-    public boolean getTransacted()
+    /**
+     * Indicates whether the session is in transacted mode.
+     *
+     * @return true if the session is in transacted mode
+     * @throws IllegalStateException - if session is closed.
+     */
+    public boolean getTransacted() throws JMSException
+    {
+        // Sun TCK checks that javax.jms.IllegalStateException is thrown for closed session
+        // nowhere else this behavior is documented
+        checkNotClosed();
+        return _transacted;
+    }
+
+    /**
+     * Indicates whether the session is in transacted mode.
+     */
+    public boolean isTransacted()
     {
         return _transacted;
     }
@@ -3047,7 +3069,7 @@ public abstract class AMQSession<C exten
      */
     public boolean prefetch()
     {
-        return getAMQConnection().getMaxPrefetch() > 0;
+        return _prefetchHighMark > 0;
     }
 
     /** Signifies that the session has pending sends to commit. */

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Mar  1 15:42:44 2012
@@ -30,7 +30,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
@@ -60,23 +59,7 @@ import org.apache.qpid.filter.MessageFil
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExecutionErrorCode;
-import org.apache.qpid.transport.ExecutionException;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.QueueQueryResult;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.*;
 import org.apache.qpid.util.Serial;
 import org.apache.qpid.util.Strings;
 import org.slf4j.Logger;
@@ -141,13 +124,13 @@ public class AMQSession_0_10 extends AMQ
 
     private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
     private TimerTask flushTask = null;
-    private RangeSet unacked = new RangeSet();
+    private RangeSet unacked = RangeSetFactory.createRangeSet();
     private int unackedCount = 0;    
 
     /**
      * Used to store the range of in tx messages
      */
-    private final RangeSet _txRangeSet = new RangeSet();
+    private final RangeSet _txRangeSet = RangeSetFactory.createRangeSet();
     private int _txSize = 0;    
     //--- constructors
 
@@ -460,7 +443,7 @@ public class AMQSession_0_10 extends AMQ
     public void sendRecover() throws AMQException, FailoverException
     {
         // release all unacked messages
-        RangeSet all = new RangeSet();
+        RangeSet all = RangeSetFactory.createRangeSet();
         RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
         RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
         for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
@@ -483,7 +466,7 @@ public class AMQSession_0_10 extends AMQ
 
     private RangeSet gatherRangeSet(ConcurrentLinkedQueue<Long> messageTags)
     {
-        RangeSet ranges = new RangeSet();
+        RangeSet ranges = RangeSetFactory.createRangeSet();
         while (true)
         {
             Long tag = messageTags.poll();
@@ -518,7 +501,7 @@ public class AMQSession_0_10 extends AMQ
     public void rejectMessage(long deliveryTag, boolean requeue)
     {
         // The value of requeue is always true
-        RangeSet ranges = new RangeSet();
+        RangeSet ranges = RangeSetFactory.createRangeSet();
         ranges.add((int) deliveryTag);
         flushProcessed(ranges, false);
         if (requeue)
@@ -812,11 +795,43 @@ public class AMQSession_0_10 extends AMQ
     {
         if (suspend)
         {
-            for (BasicMessageConsumer consumer : _consumers.values())
-            {
-                getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
-                                             Option.UNRELIABLE);
-            }
+                synchronized (getMessageDeliveryLock())
+                {
+                    for (BasicMessageConsumer consumer : _consumers.values())
+	            {
+	                getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+	                                             Option.UNRELIABLE);
+	                sync();
+	                List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
+	                _prefetchedMessageTags.addAll(tags);
+	            }
+                }
+
+                _usingDispatcherForCleanup = true;
+                syncDispatchQueue();
+                _usingDispatcherForCleanup = false;
+
+                RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
+		RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+		RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
+					+ prefetched.size());
+
+		for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
+		{
+			Range range = deliveredIter.next();
+			all.add(range);
+		}
+
+		for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
+		{
+			Range range = prefetchedIter.next();
+			all.add(range);
+		}
+
+		flushProcessed(all, false);
+		getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
+		getQpidSession().messageRelease(prefetched);
+		sync();
         }
         else
         {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Mar  1 15:42:44 2012
@@ -21,6 +21,8 @@
 package org.apache.qpid.client;
 
 
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.ArrayList;
 import java.util.Map;
 
@@ -80,7 +82,7 @@ import org.apache.qpid.transport.Transpo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
+public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
 {
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -96,8 +98,8 @@ public final class AMQSession_0_8 extend
      * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
      * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
      */
-    AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
-               MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+    protected AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+                             MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
     {
 
          super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
@@ -150,7 +152,7 @@ public final class AMQSession_0_8 extend
             _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
         }
 
-        getProtocolHandler().writeFrame(ackFrame);
+        getProtocolHandler().writeFrame(ackFrame, !isTransacted());
         _unacknowledgedMessageTags.remove(deliveryTag);
     }
 
@@ -512,7 +514,7 @@ public final class AMQSession_0_8 extend
                     // Bounced message is processed here, away from the mina thread
                     AbstractJMSMessage bouncedMessage =
                             _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
-                                                                  msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+                                                                  msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(),_queueDestinationCache,_topicDestinationCache);
                     AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
                     AMQShortString reason = msg.getReplyText();
                     _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
@@ -572,6 +574,16 @@ public final class AMQSession_0_8 extend
                  }, _connection).execute();
     }
 
+    public DestinationCache<AMQQueue> getQueueDestinationCache()
+    {
+        return _queueDestinationCache;
+    }
+
+    public DestinationCache<AMQTopic> getTopicDestinationCache()
+    {
+        return _topicDestinationCache;
+    }
+
     class QueueDeclareOkHandler extends SpecificMethodFrameListener
     {
 
@@ -613,12 +625,12 @@ public final class AMQSession_0_8 extend
         return okHandler._messageCount;
     }
 
-    protected final boolean tagLE(long tag1, long tag2)
+    protected boolean tagLE(long tag1, long tag2)
     {
         return tag1 <= tag2;
     }
 
-    protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
+    protected boolean updateRollbackMark(long currentMark, long deliveryTag)
     {
         return false;
     }
@@ -695,4 +707,55 @@ public final class AMQSession_0_8 extend
             return null;
         }
     }
+
+    public abstract static class DestinationCache<T extends AMQDestination>
+    {
+        private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>();
+
+        public T getDestination(AMQShortString exchangeName, AMQShortString routingKey)
+        {
+            Map<AMQShortString, T> routingMap = cache.get(exchangeName);
+            if(routingMap == null)
+            {
+                routingMap = new LinkedHashMap<AMQShortString, T>()
+                {
+
+                    protected boolean removeEldestEntry(Map.Entry<AMQShortString, T> eldest)
+                    {
+                        return size() >= 200;
+                    }
+                };
+                cache.put(exchangeName,routingMap);
+            }
+            T destination = routingMap.get(routingKey);
+            if(destination == null)
+            {
+                destination = newDestination(exchangeName, routingKey);
+                routingMap.put(routingKey,destination);
+            }
+            return destination;
+        }
+
+        protected abstract T newDestination(AMQShortString exchangeName, AMQShortString routingKey);
+    }
+
+    private static class TopicDestinationCache extends DestinationCache<AMQTopic>
+    {
+        protected AMQTopic newDestination(AMQShortString exchangeName, AMQShortString routingKey)
+        {
+            return new AMQTopic(exchangeName, routingKey, null);
+        }
+    }
+
+    private static class QueueDestinationCache extends DestinationCache<AMQQueue>
+    {
+        protected AMQQueue newDestination(AMQShortString exchangeName, AMQShortString routingKey)
+        {
+            return new AMQQueue(exchangeName, routingKey, routingKey);
+        }
+    }
+
+    private final TopicDestinationCache _topicDestinationCache = new TopicDestinationCache();
+    private final QueueDestinationCache _queueDestinationCache = new QueueDestinationCache();
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Mar  1 15:42:44 2012
@@ -272,10 +272,8 @@ public class BasicMessageConsumer_0_10 e
      */
     private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
     {
-        final RangeSet ranges = new RangeSet();
-        ranges.add((int) message.getDeliveryTag());
         _0_10session.messageAcknowledge
-            (ranges,
+            (Range.newInstance((int) message.getDeliveryTag()),
              _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
 
         final AMQException amqe = _0_10session.getCurrentException();
@@ -294,9 +292,7 @@ public class BasicMessageConsumer_0_10 e
      */
     private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
     {
-        final RangeSet ranges = new RangeSet();
-        ranges.add((int) message.getDeliveryTag());
-        _0_10session.flushProcessed(ranges,false);
+        _0_10session.flushProcessed(Range.newInstance((int) message.getDeliveryTag()),false);
 
         final AMQException amqe = _0_10session.getCurrentException();
         if (amqe != null)
@@ -315,10 +311,8 @@ public class BasicMessageConsumer_0_10 e
     private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
     {
         boolean result = false;
-        final RangeSet ranges = new RangeSet();
-        ranges.add((int) message.getDeliveryTag());
 
-        final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+        final Acquired acq = _0_10session.getQpidSession().messageAcquire(Range.newInstance((int)message.getDeliveryTag())).get();
 
         final RangeSet acquired = acq.getTransfers();
         if (acquired != null && acquired.size() > 0)
@@ -451,7 +445,7 @@ public class BasicMessageConsumer_0_10 e
     {
         if (_synchronousQueue.size() > 0)
         {
-            RangeSet ranges = new RangeSet();
+            RangeSet ranges = RangeSetFactory.createRangeSet();
             Iterator iterator = _synchronousQueue.iterator();
             while (iterator.hasNext())
             {
@@ -551,7 +545,7 @@ public class BasicMessageConsumer_0_10 e
         }
         else if (getSession().prefetch())
         {
-            capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+            capacity = getSession().getPrefetch();
         }
         return capacity;
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Mar  1 15:42:44 2012
@@ -38,11 +38,13 @@ import org.slf4j.LoggerFactory;
 public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
 {
     protected final Logger _logger = LoggerFactory.getLogger(getClass());
+    private AMQSession_0_8.DestinationCache<AMQTopic> _topicDestinationCache;
+    private AMQSession_0_8.DestinationCache<AMQQueue> _queueDestinationCache;
 
     private final RejectBehaviour _rejectBehaviour;
 
     protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
-                                       String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+                                       String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession_0_8 session,
                                        AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
                                        boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
     {
@@ -60,6 +62,9 @@ public class BasicMessageConsumer_0_8 ex
             consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
         }
 
+        _topicDestinationCache = session.getTopicDestinationCache();
+        _queueDestinationCache = session.getQueueDestinationCache();
+
         if (destination.getRejectBehaviour() != null)
         {
             _rejectBehaviour = destination.getRejectBehaviour();
@@ -100,7 +105,8 @@ public class BasicMessageConsumer_0_8 ex
 
         return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
                                              messageFrame.isRedelivered(), messageFrame.getExchange(),
-                                             messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+                                             messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
+                _queueDestinationCache, _topicDestinationCache);
 
     }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Thu Mar  1 15:42:44 2012
@@ -40,6 +40,7 @@ import org.apache.qpid.framing.Composite
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.MethodRegistry;
 
 public class BasicMessageProducer_0_8 extends BasicMessageProducer
 {
@@ -53,15 +54,17 @@ public class BasicMessageProducer_0_8 ex
     void declareDestination(AMQDestination destination)
     {
 
-        ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
-                                                                                              destination.getExchangeName(),
-                                                                                              destination.getExchangeClass(),
-                                                                                              false,
-                                                                                              false,
-                                                                                              false,
-                                                                                              false,
-                                                                                              true,
-                                                                                              null);
+        final MethodRegistry methodRegistry = getSession().getMethodRegistry();
+        ExchangeDeclareBody body =
+                methodRegistry.createExchangeDeclareBody(_session.getTicket(),
+                                                         destination.getExchangeName(),
+                                                         destination.getExchangeClass(),
+                                                         destination.getExchangeName().toString().startsWith("amq."),
+                                                         false,
+                                                         false,
+                                                         false,
+                                                         true,
+                                                         null);
         // Declare the exchange
         // Note that the durable and internal arguments are ignored since passive is set to false
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org