You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/02/12 14:27:57 UTC

svn commit: r1567616 [10/12] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/ qpid/cpp/bindings/qmf2/ruby/ qpid/cpp/bindings/qpid/examples/perl/ qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/lib/qpid/messaging/ qpid/cpp/bindings/qpid/ruby/ q...

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Wed Feb 12 13:27:51 2014
@@ -28,11 +28,11 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.flow.LimitlessCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.BrokerTestHelper;
@@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.test.utils.QpidTestCase;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.Set;
 
 /**
@@ -47,7 +48,8 @@ import java.util.Set;
  */
 public class AckTest extends QpidTestCase
 {
-    private Subscription _subscription;
+    private ConsumerTarget_0_8 _subscriptionTarget;
+    private Consumer _consumer;
 
     private AMQProtocolSession _protocolSession;
 
@@ -86,7 +88,6 @@ public class AckTest extends QpidTestCas
 
     private void publishMessages(int count, boolean persistent) throws AMQException
     {
-        _queue.registerSubscription(_subscription,false);
         for (int i = 1; i <= count; i++)
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -144,7 +145,7 @@ public class AckTest extends QpidTestCas
                     try
                     {
 
-                        _queue.enqueue(message);
+                        _queue.enqueue(message,null);
                     }
                     catch (AMQException e)
                     {
@@ -178,7 +179,13 @@ public class AckTest extends QpidTestCas
      */
     public void testAckChannelAssociationTest() throws AMQException
     {
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+                                                                 DEFAULT_CONSUMER_TAG,
+                                                                 null,
+                                                                 new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
         final int msgCount = 10;
         publishMessages(msgCount, true);
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -190,8 +197,8 @@ public class AckTest extends QpidTestCas
         {
             assertTrue(deliveryTag == i);
             i++;
-            QueueEntry unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.getQueue() == _queue);
+            MessageInstance unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.getOwningResource() == _queue);
         }
 
     }
@@ -202,7 +209,16 @@ public class AckTest extends QpidTestCas
     public void testNoAckMode() throws AMQException
     {
         // false arg means no acks expected
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
+        _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+                                                                   DEFAULT_CONSUMER_TAG,
+                                                                   null,
+                                                                   new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget,
+                                       null,
+                                       AMQMessage.class,
+                                       DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
         final int msgCount = 10;
         publishMessages(msgCount);
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -218,7 +234,13 @@ public class AckTest extends QpidTestCas
     public void testPersistentNoAckMode() throws AMQException
     {
         // false arg means no acks expected
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
+
+        _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+                                                                   DEFAULT_CONSUMER_TAG,
+                                                                   null,
+                                                                   new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES));
         final int msgCount = 10;
         publishMessages(msgCount, true);
 
@@ -235,7 +257,15 @@ public class AckTest extends QpidTestCas
      */
     public void testSingleAckReceivedTest() throws AMQException
     {
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+                                                                 DEFAULT_CONSUMER_TAG,
+                                                                 null,
+                                                                 new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
+
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -248,8 +278,8 @@ public class AckTest extends QpidTestCas
         for (long deliveryTag : deliveryTagSet)
         {
             assertTrue(deliveryTag == i);
-            QueueEntry unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.getQueue() == _queue);
+            MessageInstance unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.getOwningResource() == _queue);
             // 5 is the delivery tag of the message that *should* be removed
             if (++i == 5)
             {
@@ -264,7 +294,15 @@ public class AckTest extends QpidTestCas
      */
     public void testMultiAckReceivedTest() throws AMQException
     {
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+                                                                 DEFAULT_CONSUMER_TAG,
+                                                                 null,
+                                                                 new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
+
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -279,8 +317,8 @@ public class AckTest extends QpidTestCas
         for (long deliveryTag : deliveryTagSet)
         {
             assertTrue(deliveryTag == i + 5);
-            QueueEntry unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.getQueue() == _queue);
+            MessageInstance unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.getOwningResource() == _queue);
             ++i;
         }
     }
@@ -290,7 +328,15 @@ public class AckTest extends QpidTestCas
      */
     public void testMultiAckAllReceivedTest() throws AMQException
     {
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+                                                                 DEFAULT_CONSUMER_TAG,
+                                                                 null,
+                                                                 new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
+
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -303,8 +349,8 @@ public class AckTest extends QpidTestCas
         for (long deliveryTag : deliveryTagSet)
         {
             assertTrue(deliveryTag == i + 5);
-            QueueEntry unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.getQueue() == _queue);
+            MessageInstance unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.getOwningResource() == _queue);
             ++i;
         }
     }
@@ -319,12 +365,16 @@ public class AckTest extends QpidTestCas
         // Send 10 messages
         Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
 
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
-                                                                            DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
+
         final int msgCount = 1;
         publishMessages(msgCount);
 
-        _queue.deliverAsync(_subscription);
+        _consumer.externalStateChange();
 
         _channel.acknowledgeMessage(1, false);
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Wed Feb 12 13:27:51 2014
@@ -24,7 +24,7 @@ package org.apache.qpid.server.protocol.
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.util.BrokerTestHelper;
@@ -36,7 +36,7 @@ import java.util.List;
 public class AcknowledgeTest extends QpidTestCase
 {
     private AMQChannel _channel;
-    private SimpleAMQQueue _queue;
+    private AMQQueue _queue;
     private MessageStore _messageStore;
     private String _queueName;
 
@@ -79,7 +79,7 @@ public class AcknowledgeTest extends Qpi
         return (InternalTestProtocolSession)_channel.getProtocolSession();
     }
 
-    private SimpleAMQQueue getQueue()
+    private AMQQueue getQueue()
     {
         return _queue;
     }
@@ -140,7 +140,7 @@ public class AcknowledgeTest extends Qpi
         assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
 
         //Subscribe to the queue
-        AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true);
+        AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false);
 
         getQueue().deliverAsync();
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Wed Feb 12 13:27:51 2014
@@ -23,20 +23,22 @@ package org.apache.qpid.server.protocol.
 import junit.framework.TestCase;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.queue.SimpleQueueEntryList;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.subscription.MockSubscription;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
 
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
  *
@@ -59,40 +61,50 @@ public class ExtractResendAndRequeueTest
 
     private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
     private static final int INITIAL_MSG_COUNT = 10;
-    private AMQQueue _queue = new MockAMQQueue(getName());
-    private MessageStore _messageStore = new TestMemoryMessageStore();
-    private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
+    private AMQQueue _queue;
+    private LinkedList<MessageInstance> _referenceList = new LinkedList<MessageInstance>();
+    private Consumer _consumer;
+    private boolean _queueDeleted;
 
     @Override
     public void setUp() throws AMQException
     {
+        _queueDeleted = false;
         _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100);
+        _queue = mock(AMQQueue.class);
+        when(_queue.getName()).thenReturn(getName());
+        when(_queue.isDeleted()).thenReturn(_queueDeleted);
+        _consumer = mock(Consumer.class);
+        when(_consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement());
+
 
         long id = 0;
-        SimpleQueueEntryList list = new SimpleQueueEntryList(_queue);
 
         // Add initial messages to QueueEntryList
         for (int count = 0; count < INITIAL_MSG_COUNT; count++)
         {
-            AMQMessage msg = new MockAMQMessage(id);
-
-            list.add(msg);
+            ServerMessage msg = mock(ServerMessage.class);
+            when(msg.getMessageNumber()).thenReturn(id);
+            final QueueEntry entry = mock(QueueEntry.class);
+            when(entry.getMessage()).thenReturn(msg);
+            when(entry.getQueue()).thenReturn(_queue);
+            when(entry.isQueueDeleted()).thenReturn(_queueDeleted);
+            doAnswer(new Answer()
+            {
+                @Override
+                public Object answer(final InvocationOnMock invocation) throws Throwable
+                {
+                    when(entry.isDeleted()).thenReturn(true);
+                    return null;
+                }
+            }).when(entry).delete();
 
+            _unacknowledgedMessageMap.add(id, entry);
+            _referenceList.add(entry);
             //Increment ID;
             id++;
         }
 
-        // Iterate through the QueueEntryList and add entries to unacknowledgedMessageMap and referenceList
-        QueueEntryIterator queueEntries = list.iterator();
-        while(queueEntries.advance())
-        {
-            QueueEntry entry = queueEntries.getNode();
-            _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
-
-            // Store the entry for future inspection
-            _referenceList.add(entry);
-        }
-
         assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size());
     }
 
@@ -103,17 +115,14 @@ public class ExtractResendAndRequeueTest
      *
      * @return Subscription that performed the acquire
      */
-    private Subscription createSubscriptionAndAcquireMessages(LinkedList<QueueEntry> messageList)
+    private void acquireMessages(LinkedList<MessageInstance> messageList)
     {
-        Subscription subscription = new MockSubscription();
 
-        // Aquire messages in subscription
-        for (QueueEntry entry : messageList)
+        // Acquire messages in subscription
+        for(MessageInstance entry : messageList)
         {
-            entry.acquire(subscription);
+            when(entry.getDeliveredConsumer()).thenReturn(_consumer);
         }
-
-        return subscription;
     }
 
     /**
@@ -128,14 +137,14 @@ public class ExtractResendAndRequeueTest
     public void testResend() throws AMQException
     {
         //We don't need the subscription object here.
-        createSubscriptionAndAcquireMessages(_referenceList);
+        acquireMessages(_referenceList);
 
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+        final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+        final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
 
         // requeueIfUnableToResend doesn't matter here.
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, _messageStore));
+                                                                    msgToResend));
 
         assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
         assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -154,100 +163,22 @@ public class ExtractResendAndRequeueTest
      */
     public void testRequeueDueToSubscriptionClosure() throws AMQException
     {
-        Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList);
+        acquireMessages(_referenceList);
 
         // Close subscription
-        subscription.close();
+        when(_consumer.isClosed()).thenReturn(true);
 
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+        final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+        final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
 
         // requeueIfUnableToResend doesn't matter here.
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, _messageStore));
-
-        assertEquals("Message count for resend not correct.", 0, msgToResend.size());
-        assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
-        assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-    }
-
-    /**
-     * If the subscription is null, due to message being retrieved via a GET, And we request that messages are requeued
-     * requeueIfUnableToResend(set to true) then all messages should be sent to the msgToRequeue map.
-     *
-     * @throws AMQException the visit interface throws this
-     */
-
-    public void testRequeueDueToMessageHavingNoConsumerTag() throws AMQException
-    {
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
-        // requeueIfUnableToResend = true so all messages should go to msgToRequeue
-        _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, _messageStore));
+                                                                    msgToResend));
 
         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
         assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
         assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
     }
 
-    /**
-     * If the subscription is null, due to message being retrieved via a GET, And we request that we don't
-     * requeueIfUnableToResend(set to false) then all messages should be dropped as we do not have a dead letter queue.
-     *
-     * @throws AMQException the visit interface throws this
-     */
-
-    public void testDrop() throws AMQException
-    {
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
-        // requeueIfUnableToResend = false so all messages should be dropped all maps should be empty
-        _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, false, _messageStore));
-
-        assertEquals("Message count for resend not correct.", 0, msgToResend.size());
-        assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
-        assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
-
-        for (QueueEntry entry : _referenceList)
-        {
-            assertTrue("Message was not discarded", entry.isDeleted());
-        }
-
-    }
-
-    /**
-     * If the subscription is null, due to message being retrieved via a GET, AND the queue upon which the message was
-     * delivered has been deleted then it is not possible to requeue. Currently we simply discard the message but in the
-     * future we may wish to dead letter the message.
-     *
-     * Validate that at the end of the visit all Maps are empty and all messages are marked as deleted
-     *
-     * @throws AMQException the visit interface throws this
-     */
-    public void testDiscard() throws AMQException
-    {
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
-        _queue.delete();
-
-        // requeueIfUnableToResend : value doesn't matter here as queue has been deleted
-        _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, false, _messageStore));
-
-        assertEquals("Message count for resend not correct.", 0, msgToResend.size());
-        assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
-        assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
-        for (QueueEntry entry : _referenceList)
-        {
-            assertTrue("Message was not discarded", entry.isDeleted());
-        }
-    }
 
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Wed Feb 12 13:27:51 2014
@@ -47,11 +47,9 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -60,7 +58,7 @@ public class InternalTestProtocolSession
 {
     private static final Logger _logger = Logger.getLogger(InternalTestProtocolSession.class);
     // ChannelID(LIST)  -> LinkedList<Pair>
-    private final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
+    private final Map<Integer, Map<String, LinkedList<DeliveryPair>>> _channelDelivers;
     private AtomicInteger _deliveryCount = new AtomicInteger(0);
     private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
 
@@ -68,7 +66,7 @@ public class InternalTestProtocolSession
     {
         super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null);
 
-        _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
+        _channelDelivers = new HashMap<Integer, Map<String, LinkedList<DeliveryPair>>>();
 
         setTestAuthorizedSubject();
         setVirtualHost(virtualHost);
@@ -117,7 +115,7 @@ public class InternalTestProtocolSession
     {
         synchronized (_channelDelivers)
         {
-            List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
+            List<DeliveryPair> all =_channelDelivers.get(channelId).get(AMQShortString.toString(consumerTag));
 
             if (all == null)
             {
@@ -153,23 +151,23 @@ public class InternalTestProtocolSession
 
         synchronized (_channelDelivers)
         {
-            Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
+            Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
 
             if (consumers == null)
             {
-                consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+                consumers = new HashMap<String, LinkedList<DeliveryPair>>();
                 _channelDelivers.put(channelId, consumers);
             }
 
-            LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag);
+            LinkedList<DeliveryPair> consumerDelivers = consumers.get(AMQShortString.toString(consumerTag));
 
             if (consumerDelivers == null)
             {
                 consumerDelivers = new LinkedList<DeliveryPair>();
-                consumers.put(consumerTag, consumerDelivers);
+                consumers.put(consumerTag.toString(), consumerDelivers);
             }
 
-            consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)msg));
+            consumerDelivers.add(new DeliveryPair(deliveryTag, msg));
         }
     }
 
@@ -247,27 +245,27 @@ public class InternalTestProtocolSession
 
 
         @Override
-        public void deliverToClient(Subscription sub, ServerMessage message,
+        public void deliverToClient(Consumer sub, ServerMessage message,
                                     InstanceProperties props, long deliveryTag) throws AMQException
         {
             _deliveryCount.incrementAndGet();
 
             synchronized (_channelDelivers)
             {
-                Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
+                Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
 
                 if (consumers == null)
                 {
-                    consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+                    consumers = new HashMap<String, LinkedList<DeliveryPair>>();
                     _channelDelivers.put(_channelId, consumers);
                 }
 
-                LinkedList<DeliveryPair> consumerDelivers = consumers.get(((SubscriptionImpl)sub).getConsumerTag());
+                LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getName());
 
                 if (consumerDelivers == null)
                 {
                     consumerDelivers = new LinkedList<DeliveryPair>();
-                    consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers);
+                    consumers.put(sub.getName(), consumerDelivers);
                 }
 
                 consumerDelivers.add(new DeliveryPair(deliveryTag, message));

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Wed Feb 12 13:27:51 2014
@@ -26,10 +26,8 @@ import org.apache.qpid.exchange.Exchange
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -39,7 +37,7 @@ import java.util.List;
 public class QueueBrowserUsesNoAckTest extends QpidTestCase
 {
     private AMQChannel _channel;
-    private SimpleAMQQueue _queue;
+    private AMQQueue _queue;
     private MessageStore _messageStore;
     private String _queueName;
 
@@ -82,7 +80,7 @@ public class QueueBrowserUsesNoAckTest e
         return (InternalTestProtocolSession)_channel.getProtocolSession();
     }
 
-    private SimpleAMQQueue getQueue()
+    private AMQQueue getQueue()
     {
         return _queue;
     }
@@ -130,8 +128,7 @@ public class QueueBrowserUsesNoAckTest e
         //Check the process didn't suspend the subscription as this would
         // indicate we are using the prefetch credit. i.e. using acks not No-Ack
         assertTrue("The subscription has been suspended",
-                   !getChannel().getSubscription(browser).getState()
-                           .equals(Subscription.State.SUSPENDED));
+                   !getChannel().getSubscription(browser).isSuspended());
     }
 
     private void checkStoreContents(int messageCount)
@@ -144,6 +141,6 @@ public class QueueBrowserUsesNoAckTest e
         FieldTable filters = new FieldTable();
         filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
 
-        return channel.subscribeToQueue(null, queue, true, filters, false, true);
+        return channel.consumeFromSource(null, queue, true, filters, true, false);
     }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,36 +16,37 @@
   limitations under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
   <parent>
     <groupId>org.apache.qpid</groupId>
-    <artifactId>qpid-project</artifactId>
-    <version>0.26-SNAPSHOT</version>
+    <artifactId>qpid-parent</artifactId>
+    <version>1.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
-  <modelVersion>4.0.0</modelVersion>
 
   <artifactId>qpid-broker-plugins-amqp-1-0-protocol</artifactId>
+  <version>0.28-SNAPSHOT</version>
+  <name>Qpid AMQP 1-0 Protocol Broker Plug-in</name>
+  <description>AMQP 1-0 protocol broker plug-in</description>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-core</artifactId>
-      <version>0.26-SNAPSHOT</version>
+      <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-amqp-1-0-common</artifactId>
-      <version>0.26-SNAPSHOT</version>
-      <scope>compile</scope>
+      <version>${project.version}</version>
     </dependency>
 
     <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
-      <version>${log4j-version}</version>
-      <scope>compile</scope>
     </dependency>
 
   </dependencies>

Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
            ('svn:mergeinfo' removed)

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Wed Feb 12 13:27:51 2014
@@ -34,6 +34,7 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.ArrayList;
@@ -53,16 +54,8 @@ public class Connection_1_0 implements C
     private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
     private final Object _reference = new Object();
 
-
-
-    public static interface Task
-    {
-        public void doTask(Connection_1_0 connection);
-    }
-
-
-    private List<Task> _closeTasks =
-            Collections.synchronizedList(new ArrayList<Task>());
+    private List<Action<Connection_1_0>> _closeTasks =
+            Collections.synchronizedList(new ArrayList<Action<Connection_1_0>>());
 
 
 
@@ -98,26 +91,26 @@ public class Connection_1_0 implements C
         _sessions.remove(session);
     }
 
-    void removeConnectionCloseTask(final Task task)
+    void removeConnectionCloseTask(final Action<Connection_1_0> task)
     {
         _closeTasks.remove( task );
     }
 
-    void addConnectionCloseTask(final Task task)
+    void addConnectionCloseTask(final Action<Connection_1_0> task)
     {
         _closeTasks.add( task );
     }
 
     public void closeReceived()
     {
-        List<Task> taskCopy;
+        List<Action<Connection_1_0>> taskCopy;
         synchronized (_closeTasks)
         {
-            taskCopy = new ArrayList<Task>(_closeTasks);
+            taskCopy = new ArrayList<Action<Connection_1_0>>(_closeTasks);
         }
-        for(Task task : taskCopy)
+        for(Action<Connection_1_0> task : taskCopy)
         {
-            task.doTask(this);
+            task.performAction(this);
         }
         synchronized (_closeTasks)
         {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Wed Feb 12 13:27:51 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
 import java.io.EOFException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.ListIterator;
@@ -157,7 +156,7 @@ public abstract class MessageConverter_t
         }
     }
 
-    private static Map fixMapValues(final Map<String, Object> map)
+    static Map fixMapValues(final Map<String, Object> map)
     {
         for(Map.Entry<String,Object> entry : map.entrySet())
         {
@@ -166,7 +165,7 @@ public abstract class MessageConverter_t
         return map;
     }
 
-    private static Object fixValue(final Object value)
+    static Object fixValue(final Object value)
     {
         if(value instanceof byte[])
         {
@@ -186,7 +185,7 @@ public abstract class MessageConverter_t
         }
     }
 
-    private static List fixListValues(final List list)
+    static List fixListValues(final List list)
     {
         ListIterator iterator = list.listIterator();
         while(iterator.hasNext())
@@ -199,83 +198,88 @@ public abstract class MessageConverter_t
     }
 
     private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData,
-                                                                      final ServerMessage serverMessage,
+                                                                      final M serverMessage,
                                                                       SectionEncoder sectionEncoder)
     {
-            final String mimeType = serverMessage.getMessageHeader().getMimeType();
-            byte[] data = new byte[(int) serverMessage.getSize()];
-            serverMessage.getContent(ByteBuffer.wrap(data), 0);
+        final String mimeType = serverMessage.getMessageHeader().getMimeType();
+        Section bodySection = getBodySection(serverMessage, mimeType);
 
-            Section bodySection = convertMessageBody(mimeType, data);
+        final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
 
-            final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
-
-            return new StoredMessage<MessageMetaData_1_0>()
-            {
-                @Override
-                public MessageMetaData_1_0 getMetaData()
-                {
-                    return metaData;
-                }
-
-                @Override
-                public long getMessageNumber()
-                {
-                    return serverMessage.getMessageNumber();
-                }
-
-                @Override
-                public void addContent(int offsetInMessage, ByteBuffer src)
-                {
-                    throw new UnsupportedOperationException();
-                }
-
-                @Override
-                public int getContent(int offsetInMessage, ByteBuffer dst)
-                {
-                    ByteBuffer buf = allData.duplicate();
-                    buf.position(offsetInMessage);
-                    buf = buf.slice();
-                    int size;
-                    if(dst.remaining()<buf.remaining())
-                    {
-                        buf.limit(dst.remaining());
-                        size = dst.remaining();
-                    }
-                    else
+        return new StoredMessage<MessageMetaData_1_0>()
                     {
-                        size = buf.remaining();
-                    }
-                    dst.put(buf);
-                    return size;
-                }
-
-                @Override
-                public ByteBuffer getContent(int offsetInMessage, int size)
-                {
-                    ByteBuffer buf = allData.duplicate();
-                    buf.position(offsetInMessage);
-                    buf = buf.slice();
-                    if(size < buf.remaining())
-                    {
-                        buf.limit(size);
-                    }
-                    return buf;
-                }
+                        @Override
+                        public MessageMetaData_1_0 getMetaData()
+                        {
+                            return metaData;
+                        }
+
+                        @Override
+                        public long getMessageNumber()
+                        {
+                            return serverMessage.getMessageNumber();
+                        }
+
+                        @Override
+                        public void addContent(int offsetInMessage, ByteBuffer src)
+                        {
+                            throw new UnsupportedOperationException();
+                        }
+
+                        @Override
+                        public int getContent(int offsetInMessage, ByteBuffer dst)
+                        {
+                            ByteBuffer buf = allData.duplicate();
+                            buf.position(offsetInMessage);
+                            buf = buf.slice();
+                            int size;
+                            if(dst.remaining()<buf.remaining())
+                            {
+                                buf.limit(dst.remaining());
+                                size = dst.remaining();
+                            }
+                            else
+                            {
+                                size = buf.remaining();
+                            }
+                            dst.put(buf);
+                            return size;
+                        }
+
+                        @Override
+                        public ByteBuffer getContent(int offsetInMessage, int size)
+                        {
+                            ByteBuffer buf = allData.duplicate();
+                            buf.position(offsetInMessage);
+                            buf = buf.slice();
+                            if(size < buf.remaining())
+                            {
+                                buf.limit(size);
+                            }
+                            return buf;
+                        }
+
+                        @Override
+                        public StoreFuture flushToStore()
+                        {
+                            throw new UnsupportedOperationException();
+                        }
+
+                        @Override
+                        public void remove()
+                        {
+                            throw new UnsupportedOperationException();
+                        }
+                    };
+    }
 
-                @Override
-                public StoreFuture flushToStore()
-                {
-                    throw new UnsupportedOperationException();
-                }
+    protected Section getBodySection(final M serverMessage, final String mimeType)
+    {
+        byte[] data = new byte[(int) serverMessage.getSize()];
+        serverMessage.getContent(ByteBuffer.wrap(data), 0);
 
-                @Override
-                public void remove()
-                {
-                    throw new UnsupportedOperationException();
-                }
-            };
-        }
+        return convertMessageBody(mimeType, data);
+    }
 
     private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder)
     {
@@ -286,7 +290,7 @@ public abstract class MessageConverter_t
         Binary dataEncoding = sectionEncoder.getEncoding();
 
         final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength());
-        metaData.writeToBuffer(0,allData);
+        metaData.writeToBuffer(allData);
         allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength());
         return allData;
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Wed Feb 12 13:27:51 2014
@@ -314,7 +314,7 @@ public class MessageMetaData_1_0 impleme
         return buf;
     }
 
-    public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+    public int writeToBuffer(ByteBuffer dest)
     {
         ByteBuffer buf = _encoded;
 
@@ -326,7 +326,7 @@ public class MessageMetaData_1_0 impleme
 
         buf = buf.duplicate();
 
-        buf.position(offsetInMetaData);
+        buf.position(0);
 
         if(dest.remaining() < buf.limit())
         {

Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
            ('svn:mergeinfo' removed)

Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
            ('svn:mergeinfo' removed)

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Wed Feb 12 13:27:51 2014
@@ -24,22 +24,21 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.amqp_1_0.type.Outcome;
 import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
 
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.queue.AMQQueue;
 
 import org.apache.qpid.server.txn.ServerTransaction;
 
-public class QueueDestination implements SendingDestination, ReceivingDestination
+public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination
 {
     private static final Logger _logger = Logger.getLogger(QueueDestination.class);
     private static final Accepted ACCEPTED = new Accepted();
     private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
 
 
-    private AMQQueue _queue;
-
     public QueueDestination(AMQQueue queue)
     {
-        _queue = queue;
+        super(queue);
     }
 
     public Outcome[] getOutcomes()
@@ -52,7 +51,7 @@ public class QueueDestination implements
 
         try
         {
-            txn.enqueue(_queue,message, new ServerTransaction.Action()
+            txn.enqueue(getQueue(),message, new ServerTransaction.Action()
             {
 
 
@@ -60,8 +59,7 @@ public class QueueDestination implements
                 {
                     try
                     {
-
-                        _queue.enqueue(message);
+                        getQueue().enqueue(message,null);
                     }
                     catch (Exception e)
                     {
@@ -93,7 +91,7 @@ public class QueueDestination implements
 
     public AMQQueue getQueue()
     {
-        return _queue;
+        return (AMQQueue) super.getQueue();
     }
 
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Wed Feb 12 13:27:51 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,16 +43,7 @@ import org.apache.qpid.amqp_1_0.type.Del
 import org.apache.qpid.amqp_1_0.type.Outcome;
 import org.apache.qpid.amqp_1_0.type.Symbol;
 import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Modified;
-import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Released;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
 import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
 import org.apache.qpid.amqp_1_0.type.transport.Detach;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
@@ -64,11 +56,14 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.exchange.TopicExchange;
 import org.apache.qpid.server.filter.JMSSelectorFilter;
 import org.apache.qpid.server.filter.SimpleFilterManager;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
@@ -78,18 +73,22 @@ public class SendingLink_1_0 implements 
     private VirtualHost _vhost;
     private SendingDestination _destination;
 
-    private Subscription_1_0 _subscription;
+    private Consumer _consumer;
+    private ConsumerTarget_1_0 _target;
+
     private boolean _draining;
-    private final Map<Binary, QueueEntry> _unsettledMap =
-            new HashMap<Binary, QueueEntry>();
+    private final Map<Binary, MessageInstance> _unsettledMap =
+            new HashMap<Binary, MessageInstance>();
 
     private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap =
             new ConcurrentHashMap<Binary, UnsettledAction>();
     private volatile SendingLinkAttachment _linkAttachment;
     private TerminusDurability _durability;
-    private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>();
+    private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>();
     private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
     private Runnable _closeAction;
+    private final MessageSource _queue;
+
 
     public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
                            final VirtualHost vhost,
@@ -103,24 +102,22 @@ public class SendingLink_1_0 implements 
         _durability = source.getDurable();
         linkAttachment.setDeliveryStateHandler(this);
         QueueDestination qd = null;
-        AMQQueue queue = null;
 
+        EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
 
 
         boolean noLocal = false;
         JMSSelectorFilter messageFilter = null;
 
-        if(destination instanceof QueueDestination)
+        if(destination instanceof MessageSourceDestination)
         {
-            queue = ((QueueDestination) _destination).getQueue();
+            _queue = ((MessageSourceDestination) _destination).getQueue();
 
-            if(queue.getAvailableAttributes().contains("topic"))
+            if(_queue instanceof AMQQueue && ((AMQQueue)_queue).getAvailableAttributes().contains("topic"))
             {
                 source.setDistributionMode(StdDistMode.COPY);
             }
 
-            qd = (QueueDestination) destination;
-
             Map<Symbol,Filter> filters = source.getFilter();
 
             Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
@@ -167,7 +164,13 @@ public class SendingLink_1_0 implements 
             }
             source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
 
-            _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY);
+            _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
+            if(source.getDistributionMode() != StdDistMode.COPY)
+            {
+                options.add(Consumer.Option.ACQUIRES);
+                options.add(Consumer.Option.SEES_REQUEUES);
+            }
+
         }
         else if(destination instanceof ExchangeDestination)
         {
@@ -199,7 +202,7 @@ public class SendingLink_1_0 implements 
                     name = UUID.randomUUID().toString();
                 }
 
-                queue = _vhost.getQueue(name);
+                AMQQueue queue = _vhost.getQueue(name);
                 Exchange exchange = exchangeDestination.getExchange();
 
                 if(queue == null)
@@ -299,9 +302,10 @@ public class SendingLink_1_0 implements 
                         }
                     }
                 }
+                _queue = queue;
                 source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
 
-                exchange.addBinding(binding,queue,null);
+                exchange.addBinding(binding, queue,null);
                 source.setDistributionMode(StdDistMode.COPY);
 
                 if(!isDurable)
@@ -309,10 +313,10 @@ public class SendingLink_1_0 implements 
                     final String queueName = name;
                     final AMQQueue tempQueue = queue;
 
-                    final Connection_1_0.Task deleteQueueTask =
-                                            new Connection_1_0.Task()
+                    final Action<Connection_1_0> deleteQueueTask =
+                                            new Action<Connection_1_0>()
                                             {
-                                                public void doTask(Connection_1_0 session)
+                                                public void performAction(Connection_1_0 session)
                                                 {
                                                     if (_vhost.getQueue(queueName) == tempQueue)
                                                     {
@@ -331,9 +335,9 @@ public class SendingLink_1_0 implements 
 
                                     getSession().getConnection().addConnectionCloseTask(deleteQueueTask);
 
-                                    queue.addQueueDeleteTask(new AMQQueue.Task()
+                                    queue.addQueueDeleteTask(new Action<AMQQueue>()
                                     {
-                                        public void doTask(AMQQueue queue)
+                                        public void performAction(AMQQueue queue)
                                         {
                                             getSession().getConnection().removeConnectionCloseTask(deleteQueueTask);
                                         }
@@ -347,31 +351,52 @@ public class SendingLink_1_0 implements 
             catch (AMQSecurityException e)
             {
                 _logger.error("Security error", e);
+                throw new RuntimeException(e);
             }
             catch (AMQInternalException e)
             {
                 _logger.error("Internal error", e);
+                throw new RuntimeException(e);
             }
             catch (AMQException e)
             {
                 _logger.error("Error", e);
+                throw new RuntimeException(e);
             }
-            _subscription = new Subscription_1_0(this, qd, true);
 
+
+            _target = new ConsumerTarget_1_0(this, true);
+            options.add(Consumer.Option.ACQUIRES);
+            options.add(Consumer.Option.SEES_REQUEUES);
+
+        }
+        else
+        {
+            throw new RuntimeException("Unknown destination type");
         }
 
-        if(_subscription != null)
+        if(_target != null)
         {
-            _subscription.setNoLocal(noLocal);
-            if(messageFilter!=null)
+            if(noLocal)
             {
-                _subscription.setFilters(new SimpleFilterManager(messageFilter));
+                options.add(Consumer.Option.NO_LOCAL);
             }
 
             try
             {
-
-                queue.registerSubscription(_subscription, false);
+                final String name;
+                if(getEndpoint().getTarget() instanceof Target)
+                {
+                    Target target = (Target) getEndpoint().getTarget();
+                    name = target.getAddress() == null ? getEndpoint().getName() : target.getAddress();
+                }
+                else
+                {
+                    name = getEndpoint().getName();
+                }
+                _consumer = _queue.addConsumer(_target,
+                                               messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+                                               Message_1_0.class, name, options);
             }
             catch (AMQException e)
             {
@@ -394,12 +419,11 @@ public class SendingLink_1_0 implements 
         // if not durable or close
         if(!TerminusDurability.UNSETTLED_STATE.equals(_durability))
         {
-            AMQQueue queue = _subscription.getQueue();
 
             try
             {
 
-                queue.unregisterSubscription(_subscription);
+                _consumer.close();
 
             }
             catch (AMQException e)
@@ -426,7 +450,7 @@ public class SendingLink_1_0 implements 
             {
                 try
                 {
-                    queue.getVirtualHost().removeQueue(queue);
+                    _vhost.removeQueue((AMQQueue)_queue);
                 }
                 catch(AMQException e)
                 {
@@ -443,7 +467,7 @@ public class SendingLink_1_0 implements 
         else if(detach == null || detach.getError() != null)
         {
             _linkAttachment = null;
-            _subscription.flowStateChanged();
+            _target.flowStateChanged();
         }
         else
         {
@@ -491,7 +515,7 @@ public class SendingLink_1_0 implements 
         }
         if(_resumeAcceptedTransfers.isEmpty())
         {
-            _subscription.flowStateChanged();
+            _target.flowStateChanged();
         }
 
     }
@@ -531,7 +555,7 @@ public class SendingLink_1_0 implements 
         }
     }
 
-    public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry)
+    public void addUnsettled(Binary tag, UnsettledAction unsettledAction, MessageInstance queueEntry)
     {
         _unsettledActionMap.put(tag,unsettledAction);
         if(getTransactionId() == null)
@@ -593,9 +617,9 @@ public class SendingLink_1_0 implements 
     public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
     {
 
-        if(_subscription.isActive())
+        if(_consumer.isActive())
         {
-            _subscription.suspend();
+            _target.suspend();
         }
 
         _linkAttachment = linkAttachment;
@@ -603,14 +627,14 @@ public class SendingLink_1_0 implements 
         SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
         endpoint.setDeliveryStateHandler(this);
         Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
-        Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap);
+        Map<Binary, MessageInstance> unsettledCopy = new HashMap<Binary, MessageInstance>(_unsettledMap);
         _resumeAcceptedTransfers.clear();
         _resumeFullTransfers.clear();
 
-        for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet())
+        for(Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
         {
             Binary deliveryTag = entry.getKey();
-            final QueueEntry queueEntry = entry.getValue();
+            final MessageInstance queueEntry = entry.getValue();
             if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
             {
                 queueEntry.setRedelivered();
@@ -624,7 +648,7 @@ public class SendingLink_1_0 implements 
                 if(outcome instanceof Accepted)
                 {
                     AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
-                    if(_subscription.acquires())
+                    if(_consumer.acquires())
                     {
                         txn.dequeue(Collections.singleton(queueEntry),
                                 new ServerTransaction.Action()
@@ -644,7 +668,7 @@ public class SendingLink_1_0 implements 
                 else if(outcome instanceof Released)
                 {
                     AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
-                    if(_subscription.acquires())
+                    if(_consumer.acquires())
                     {
                         txn.dequeue(Collections.singleton(queueEntry),
                                 new ServerTransaction.Action()
@@ -678,9 +702,9 @@ public class SendingLink_1_0 implements 
 
     public Map getUnsettledOutcomeMap()
     {
-        Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap);
+        Map<Binary, MessageInstance> unsettled = new HashMap<Binary, MessageInstance>(_unsettledMap);
 
-        for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet())
+        for(Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet())
         {
             entry.setValue(null);
         }
@@ -692,4 +716,9 @@ public class SendingLink_1_0 implements 
     {
         _closeAction = action;
     }
+
+    public VirtualHost getVirtualHost()
+    {
+        return _vhost;
+    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Wed Feb 12 13:27:51 2014
@@ -41,6 +41,8 @@ import org.apache.qpid.AMQSecurityExcept
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -48,6 +50,7 @@ import org.apache.qpid.server.protocol.L
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.*;
@@ -108,11 +111,11 @@ public class Session_1_0 implements Sess
                         source.setAddress(tempQueue.getName());
                     }
                     String addr = source.getAddress();
-                    AMQQueue queue = _vhost.getQueue(addr);
+                    MessageSource queue = _vhost.getMessageSource(addr);
                     if(queue != null)
                     {
 
-                        destination = new QueueDestination(queue);
+                        destination = new MessageSourceDestination(queue);
 
 
 
@@ -249,11 +252,11 @@ public class Session_1_0 implements Sess
                         }
 
                         String addr = target.getAddress();
-                        Exchange exchg = _vhost.getExchange(addr);
-                        if(exchg != null)
+                        MessageDestination messageDestination = _vhost.getMessageDestination(addr);
+                        if(messageDestination != null)
                         {
-                            destination = new ExchangeDestination(exchg, target.getDurable(),
-                                                                  target.getExpiryPolicy());
+                            destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
+                                                                       target.getExpiryPolicy());
                         }
                         else
                         {
@@ -343,10 +346,10 @@ public class Session_1_0 implements Sess
 
             if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
             {
-                final Connection_1_0.Task deleteQueueTask =
-                        new Connection_1_0.Task()
+                final Action<Connection_1_0> deleteQueueTask =
+                        new Action<Connection_1_0>()
                         {
-                            public void doTask(Connection_1_0 session)
+                            public void performAction(Connection_1_0 session)
                             {
                                 if (_vhost.getQueue(queueName) == tempQueue)
                                 {
@@ -365,9 +368,9 @@ public class Session_1_0 implements Sess
 
                 _connection.addConnectionCloseTask(deleteQueueTask);
 
-                queue.addQueueDeleteTask(new AMQQueue.Task()
+                queue.addQueueDeleteTask(new Action<AMQQueue>()
                 {
-                    public void doTask(AMQQueue queue)
+                    public void performAction(AMQQueue queue)
                     {
                         _connection.removeConnectionCloseTask(deleteQueueTask);
                     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,50 +16,50 @@
   limitations under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
   <parent>
     <groupId>org.apache.qpid</groupId>
-    <artifactId>qpid-project</artifactId>
-    <version>0.26-SNAPSHOT</version>
+    <artifactId>qpid-parent</artifactId>
+    <version>1.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
-  <modelVersion>4.0.0</modelVersion>
 
   <artifactId>qpid-broker-plugins-amqp-msg-conv-0-10-to-1-0</artifactId>
+  <version>0.28-SNAPSHOT</version>
+  <name>Qpid AMQP 0-10 to 1-0 Message Conversion Broker Plug-in</name>
+  <description>AMQP message conversion (0-10 to 1-0) broker plug-in</description>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-core</artifactId>
-      <version>0.26-SNAPSHOT</version>
+      <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-common</artifactId>
-      <version>0.26-SNAPSHOT</version>
-      <scope>compile</scope>
+      <version>${project.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-plugins-amqp-0-10-protocol</artifactId>
-      <version>0.26-SNAPSHOT</version>
-      <scope>compile</scope>
+      <version>${project.version}</version>
     </dependency> 
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-amqp-1-0-common</artifactId>
-      <version>0.26-SNAPSHOT</version>
-      <scope>compile</scope>
+      <version>${project.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-plugins-amqp-1-0-protocol</artifactId>
-      <version>0.26-SNAPSHOT</version>
-      <scope>compile</scope>
+      <version>${project.version}</version>
     </dependency>
   </dependencies>
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,43 +16,44 @@
   limitations under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
   <parent>
     <groupId>org.apache.qpid</groupId>
-    <artifactId>qpid-project</artifactId>
-    <version>0.26-SNAPSHOT</version>
+    <artifactId>qpid-parent</artifactId>
+    <version>1.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
-  <modelVersion>4.0.0</modelVersion>
 
   <artifactId>qpid-broker-plugins-amqp-msg-conv-0-8-to-0-10</artifactId>
+  <version>0.28-SNAPSHOT</version>
+  <name>Qpid AMQP 0-8 to 0-10 Message Conversion Broker Plug-in</name>
+  <description>AMQP message conversion (0-8, 0-9 and 0-9-1 to 0-10) broker plug-in</description>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-core</artifactId>
-      <version>0.26-SNAPSHOT</version>
+      <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-common</artifactId>
-      <version>0.26-SNAPSHOT</version>
-      <scope>compile</scope>
+      <version>${project.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
-      <version>0.26-SNAPSHOT</version>
-      <scope>compile</scope>
+      <version>${project.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-plugins-amqp-0-10-protocol</artifactId>
-      <version>0.26-SNAPSHOT</version>
-      <scope>compile</scope>
+      <version>${project.version}</version>
     </dependency>
   </dependencies>
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Wed Feb 12 13:27:51 2014
@@ -61,7 +61,7 @@ public class MessageConverter_0_10_to_0_
         {
             if(deliveryProps.hasDeliveryMode())
             {
-                props.setDeliveryMode((byte) (deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT
+                props.setDeliveryMode((deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT
                                               ? BasicContentHeaderProperties.PERSISTENT
                                               : BasicContentHeaderProperties.NON_PERSISTENT));
             }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,50 +16,50 @@
   limitations under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
   <parent>
     <groupId>org.apache.qpid</groupId>
-    <artifactId>qpid-project</artifactId>
-    <version>0.26-SNAPSHOT</version>
+    <artifactId>qpid-parent</artifactId>
+    <version>1.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
-  <modelVersion>4.0.0</modelVersion>
 
   <artifactId>qpid-broker-plugins-amqp-msg-conv-0-8-to-1-0</artifactId>
+  <version>0.28-SNAPSHOT</version>
+  <name>Qpid AMQP 0-8 to 1-0 Message Conversion Broker Plug-in</name>
+  <description>AMQP message conversion (0-8, 0-9 and 0-9-1 to 1-0) broker plug-in</description>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-core</artifactId>
-      <version>0.26-SNAPSHOT</version>
+      <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-common</artifactId>
-      <version>0.26-SNAPSHOT</version>
-      <scope>compile</scope>
+      <version>${project.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
-      <version>0.26-SNAPSHOT</version>
-      <scope>compile</scope>
+      <version>${project.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-amqp-1-0-common</artifactId>
-      <version>0.26-SNAPSHOT</version>
-      <scope>compile</scope>
+      <version>${project.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-plugins-amqp-1-0-protocol</artifactId>
-      <version>0.26-SNAPSHOT</version>
-      <scope>compile</scope>
+      <version>${project.version}</version>
     </dependency>
   </dependencies>
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,50 +16,49 @@
   limitations under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
   <parent>
     <groupId>org.apache.qpid</groupId>
-    <artifactId>qpid-project</artifactId>
-    <version>0.26-SNAPSHOT</version>
+    <artifactId>qpid-parent</artifactId>
+    <version>1.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
-  <modelVersion>4.0.0</modelVersion>
 
   <artifactId>qpid-broker-plugins-derby-store</artifactId>
+  <version>0.28-SNAPSHOT</version>
+  <name>Qpid Derby Message Store</name>
+  <description>Apache Derby DB message store broker plug-in</description>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-core</artifactId>
-      <version>0.26-SNAPSHOT</version>
-      <scope>compile</scope>
+      <version>${project.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.derby</groupId>
       <artifactId>derby</artifactId>
-      <version>10.8.2.2</version>
-      <scope>compile</scope>
     </dependency>
 
     <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
-      <version>${log4j-version}</version>
-      <scope>compile</scope>
     </dependency>
 
     <!-- test dependencies -->
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-test-utils</artifactId>
-      <version>0.26-SNAPSHOT</version>
+      <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-core</artifactId>
-      <version>0.26-SNAPSHOT</version>
+      <version>${project.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-provider-bone/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-provider-bone/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-provider-bone/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-provider-bone/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,29 +16,31 @@
   limitations under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
   <parent>
     <groupId>org.apache.qpid</groupId>
-    <artifactId>qpid-project</artifactId>
-    <version>0.26-SNAPSHOT</version>
+    <artifactId>qpid-parent</artifactId>
+    <version>1.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
-  <modelVersion>4.0.0</modelVersion>
 
   <artifactId>qpid-broker-plugins-jdbc-provider-bone</artifactId>
+  <version>0.28-SNAPSHOT</version>
+  <name>Qpid JDBC Message Store Connection Pooling Plug-in</name>
+  <description>JDBC Message Store Connection Pooling broker plug-in using BoneCP</description>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-core</artifactId>
-      <version>0.26-SNAPSHOT</version>
+      <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>com.jolbox</groupId>
       <artifactId>bonecp</artifactId>
-      <version>0.7.1.RELEASE</version>
-      <scope>compile</scope>
       <exclusions>
         <exclusion>
           <!-- exclude and specify a fixed version below -->
@@ -51,7 +53,6 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>14.0.1</version>
       <scope>runtime</scope>
     </dependency>
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,43 +16,45 @@
   limitations under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
   <parent>
     <groupId>org.apache.qpid</groupId>
-    <artifactId>qpid-project</artifactId>
-    <version>0.26-SNAPSHOT</version>
+    <artifactId>qpid-parent</artifactId>
+    <version>1.0-SNAPSHOT</version>
     <relativePath>../../pom.xml</relativePath>
   </parent>
-  <modelVersion>4.0.0</modelVersion>
 
   <artifactId>qpid-broker-plugins-jdbc-store</artifactId>
+  <version>0.28-SNAPSHOT</version>
+  <name>Qpid JDBC Message Store Broker Plug-in</name>
+  <description>JDBC message store broker plug-in</description>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-core</artifactId>
-      <version>0.26-SNAPSHOT</version>
+      <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
-      <version>${log4j-version}</version>
-      <scope>compile</scope>
     </dependency>
 
     <!-- test dependencies -->
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-test-utils</artifactId>
-      <version>0.26-SNAPSHOT</version>
+      <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-broker-core</artifactId>
-      <version>0.26-SNAPSHOT</version>
+      <version>${project.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
@@ -60,7 +62,6 @@
     <dependency>
       <groupId>org.apache.derby</groupId>
       <artifactId>derby</artifactId>
-      <version>10.8.2.2</version>
       <scope>test</scope>
     </dependency>
   </dependencies>



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org