You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/08/08 18:23:44 UTC

svn commit: r1616813 - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/test/java/org/...

Author: rgodfrey
Date: Fri Aug  8 16:23:43 2014
New Revision: 1616813

URL: http://svn.apache.org/r1616813
Log:
QPID-4307 : [Java Broker] prevent the copying/moving of messages onto queues on which the message already exists

Added:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java
      - copied, changed from r1616557, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
    qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
    qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Aug  8 16:23:43 2014
@@ -511,7 +511,7 @@ public abstract class AbstractExchange<T
             Exchange altExchange = getAlternateExchange();
             if(altExchange != null)
             {
-                return ((ExchangeImpl)altExchange).send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
+                return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
             }
             else
             {
@@ -520,7 +520,24 @@ public abstract class AbstractExchange<T
         }
         else
         {
-            final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+            final BaseQueue[] baseQueues;
+
+            if(message.isReferenced())
+            {
+                ArrayList<BaseQueue> uniqueQueues = new ArrayList<>(queues.size());
+                for(BaseQueue q : queues)
+                {
+                    if(!message.isReferenced(q))
+                    {
+                        uniqueQueues.add(q);
+                    }
+                }
+                baseQueues = uniqueQueues.toArray(new BaseQueue[uniqueQueues.size()]);
+            }
+            else
+            {
+                baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+            }
 
             txn.enqueue(queues,message, new ServerTransaction.Action()
             {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Fri Aug  8 16:23:43 2014
@@ -21,10 +21,17 @@
 package org.apache.qpid.server.message;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
@@ -33,10 +40,14 @@ public abstract class AbstractServerMess
     private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
             AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
 
+    private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, Collection> _resourcesUpdater =
+            AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class, Collection.class,"_resources");
+
 
     private volatile int _referenceCount = 0;
     private final StoredMessage<T> _handle;
     private final Object _connectionReference;
+    private volatile Collection<UUID> _resources;
 
 
     public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
@@ -117,6 +128,26 @@ public abstract class AbstractServerMess
     }
 
     @Override
+    final public MessageReference<X> newReference(TransactionLogResource object)
+    {
+        return new Reference(this, object);
+    }
+
+    @Override
+    final public boolean isReferenced(TransactionLogResource resource)
+    {
+        Collection<UUID> resources = _resources;
+        return resources != null && resources.contains(resource.getId());
+    }
+
+    @Override
+    final public boolean isReferenced()
+    {
+        Collection<UUID> resources = _resources;
+        return resources != null && !resources.isEmpty();
+    }
+
+    @Override
     final public boolean isPersistent()
     {
         return _handle.getMetaData().isPersistent();
@@ -156,15 +187,52 @@ public abstract class AbstractServerMess
                 AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released");
 
         private AbstractServerMessageImpl<X, T> _message;
+        private final UUID _resourceId;
         private volatile int _released;
 
         private Reference(final AbstractServerMessageImpl<X, T> message)
         {
+            this(message, null);
+        }
+        private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource)
+        {
             _message = message;
+            if(resource != null)
+            {
+                Collection<UUID> currentValue;
+                Collection<UUID> newValue;
+                _resourceId = resource.getId();
+                do
+                {
+                    currentValue = _message._resources;
+
+                    if(currentValue == null)
+                    {
+                        newValue = Collections.singleton(_resourceId);
+                    }
+                    else
+                    {
+                        if(currentValue.contains(_resourceId))
+                        {
+                            throw new MessageAlreadyReferencedException(_message.getMessageNumber(), resource);
+                        }
+                        newValue = new ArrayList<>(currentValue.size()+1);
+                        newValue.addAll(currentValue);
+                        newValue.add(_resourceId);
+                    }
+
+                }
+                while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue));
+            }
+            else
+            {
+                _resourceId = null;
+            }
             if(!_message.incrementReference())
             {
                 throw new MessageDeletedException(message.getMessageNumber());
             }
+
         }
 
         public X getMessage()
@@ -176,6 +244,34 @@ public abstract class AbstractServerMess
         {
             if(_releasedUpdater.compareAndSet(this,0,1))
             {
+                if(_resourceId != null)
+                {
+                    Collection<UUID> currentValue;
+                    Collection<UUID> newValue;
+                    do
+                    {
+                        currentValue = _message._resources;
+                        if(currentValue.size() == 1)
+                        {
+                            newValue = null;
+                        }
+                        else
+                        {
+                            UUID[] array = new UUID[currentValue.size()-1];
+                            int pos = 0;
+                            for(UUID uuid : currentValue)
+                            {
+                                if(!_resourceId.equals(uuid))
+                                {
+                                    array[pos++] = uuid;
+                                }
+                            }
+                            newValue = Arrays.asList(array);
+                        }
+                    }
+                    while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue));
+
+                }
                 _message.decrementReference();
             }
         }

Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java (from r1616557, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java&p1=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java&r1=1616557&r2=1616813&rev=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java Fri Aug  8 16:23:43 2014
@@ -20,10 +20,12 @@
  */
 package org.apache.qpid.server.message;
 
-public class MessageDeletedException extends RuntimeException
+import org.apache.qpid.server.store.TransactionLogResource;
+
+public class MessageAlreadyReferencedException extends RuntimeException
 {
-    MessageDeletedException(final long messageNumber)
+    MessageAlreadyReferencedException(final long messageNumber, TransactionLogResource resource)
     {
-        super("The message with id " + messageNumber + " has already been deleted, no new reference can be taken");
+        super("The message with id " + messageNumber + " is already referenced by resource " + resource.getName());
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java Fri Aug  8 16:23:43 2014
@@ -20,10 +20,11 @@
  */
 package org.apache.qpid.server.message;
 
+import java.nio.ByteBuffer;
+
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.server.store.TransactionLogResource;
 
 public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueueableMessage, MessageContentSource
 {
@@ -41,6 +42,12 @@ public interface ServerMessage<T extends
 
     MessageReference newReference();
 
+    MessageReference newReference(TransactionLogResource object);
+
+    boolean isReferenced(TransactionLogResource resource);
+
+    boolean isReferenced();
+
     long getMessageNumber();
 
     long getArrivalTime();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Aug  8 16:23:43 2014
@@ -2549,7 +2549,9 @@ public abstract class AbstractQueue<X ex
                                                                                         final ServerTransaction txn,
                                                                                         final Action<? super MessageInstance> postEnqueueAction)
     {
-            txn.enqueue(this,message, new ServerTransaction.Action()
+        if(!message.isReferenced(this))
+        {
+            txn.enqueue(this, message, new ServerTransaction.Action()
             {
                 MessageReference _reference = message.newReference();
 
@@ -2571,6 +2573,11 @@ public abstract class AbstractQueue<X ex
                 }
             });
             return 1;
+        }
+        else
+        {
+            return 0;
+        }
 
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Aug  8 16:23:43 2014
@@ -103,7 +103,7 @@ public abstract class QueueEntryImpl imp
     {
         _queueEntryList = queueEntryList;
 
-        _message = message == null ? null : message.newReference();
+        _message = message == null ? null : message.newReference(queueEntryList.getQueue());
 
         _entryIdUpdater.set(this, entryId);
         populateInstanceProperties();
@@ -112,7 +112,7 @@ public abstract class QueueEntryImpl imp
     public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message)
     {
         _queueEntryList = queueEntryList;
-        _message = message == null ? null :  message.newReference();
+        _message = message == null ? null :  message.newReference(queueEntryList.getQueue());
         populateInstanceProperties();
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Fri Aug  8 16:23:43 2014
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.Se
 
 public interface QueueEntryList
 {
-    AMQQueue getQueue();
+    AMQQueue<?> getQueue();
 
     QueueEntry add(ServerMessage message);
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Fri Aug  8 16:23:43 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.exchange;
 
 import static org.apache.qpid.common.AMQPFilterTypes.*;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -43,6 +44,7 @@ import org.apache.qpid.server.model.Queu
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.QueueExistsException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -497,6 +499,7 @@ public class TopicExchangeTest extends Q
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);
         when(message.newReference()).thenReturn(ref);
+        when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
         when(message.getMessageNumber()).thenReturn(messageNumber);
         for(BaseQueue q : queues)
         {

Added: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java?rev=1616813&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java Fri Aug  8 16:23:43 2014
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class AbstractServerMessageTest extends QpidTestCase
+{
+    private static class TestMessage<T extends StorableMessageMetaData> extends AbstractServerMessageImpl<TestMessage<T>,T>
+    {
+
+        public TestMessage(final StoredMessage<T> handle,
+                           final Object connectionReference)
+        {
+            super(handle, connectionReference);
+        }
+
+        @Override
+        public String getInitialRoutingAddress()
+        {
+            return null;
+        }
+
+        @Override
+        public AMQMessageHeader getMessageHeader()
+        {
+            return null;
+        }
+
+        @Override
+        public long getSize()
+        {
+            return 0;
+        }
+
+        @Override
+        public long getExpiration()
+        {
+            return 0;
+        }
+
+        @Override
+        public long getArrivalTime()
+        {
+            return 0;
+        }
+    }
+
+    private TransactionLogResource createQueue(String name)
+    {
+        TransactionLogResource queue = mock(TransactionLogResource.class);
+        when(queue.getId()).thenReturn(UUID.randomUUID());
+        when(queue.getName()).thenReturn(name);
+        return queue;
+    }
+
+    public void testReferences()
+    {
+        TransactionLogResource q1 = createQueue("1");
+        TransactionLogResource q2 = createQueue("2");
+
+        TestMessage<StorableMessageMetaData> msg = new TestMessage<StorableMessageMetaData>(mock(StoredMessage.class),this);
+        assertFalse(msg.isReferenced());
+        assertFalse(msg.isReferenced(q1));
+
+        MessageReference<TestMessage<StorableMessageMetaData>> nonQueueRef = msg.newReference();
+        assertFalse(msg.isReferenced());
+        assertFalse(msg.isReferenced(q1));
+
+        MessageReference<TestMessage<StorableMessageMetaData>> q1ref = msg.newReference(q1);
+        assertTrue(msg.isReferenced());
+        assertTrue(msg.isReferenced(q1));
+        assertFalse(msg.isReferenced(q2));
+
+        q1ref.release();
+        assertFalse(msg.isReferenced());
+        assertFalse(msg.isReferenced(q1));
+
+        q1ref = msg.newReference(q1);
+        assertTrue(msg.isReferenced());
+        assertTrue(msg.isReferenced(q1));
+        assertFalse(msg.isReferenced(q2));
+
+        MessageReference<TestMessage<StorableMessageMetaData>> q2ref = msg.newReference(q2);
+        assertTrue(msg.isReferenced());
+        assertTrue(msg.isReferenced(q1));
+        assertTrue(msg.isReferenced(q2));
+
+        try
+        {
+            msg.newReference(q1);
+            fail("Should not be able to create a second reference to the same queue");
+        }
+        catch (MessageAlreadyReferencedException e)
+        {
+            // pass
+        }
+        q2ref.release();
+        assertTrue(msg.isReferenced());
+        assertTrue(msg.isReferenced(q1));
+        assertFalse(msg.isReferenced(q2));
+
+        q1ref.release();
+        assertFalse(msg.isReferenced());
+        assertFalse(msg.isReferenced(q1));
+
+        nonQueueRef.release();
+
+        try
+        {
+            msg.newReference(q1);
+            fail("Message should not allow new references as all references had been removed");
+        }
+        catch(MessageDeletedException e)
+        {
+            // pass
+        }
+
+    }
+}

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Fri Aug  8 16:23:43 2014
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.server.queue;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.contains;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
@@ -60,6 +61,7 @@ import org.apache.qpid.server.model.Queu
 import org.apache.qpid.server.model.QueueNotificationListener;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -1157,6 +1159,7 @@ abstract class AbstractQueueTestBase ext
 
 
         when(message.newReference()).thenReturn(ref);
+        when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
 
         return message;
     }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java Fri Aug  8 16:23:43 2014
@@ -19,6 +19,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -38,6 +39,7 @@ import org.apache.qpid.server.model.Conf
 import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class LastValueQueueListTest extends TestCase
@@ -220,6 +222,8 @@ public class LastValueQueueListTest exte
 
         MessageReference messageReference = mock(MessageReference.class);
         when(mockMessage.newReference()).thenReturn(messageReference);
+        when(mockMessage.newReference(any(TransactionLogResource.class))).thenReturn(messageReference);
+
         when(messageReference.getMessage()).thenReturn(mockMessage);
 
         return mockMessage;

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java Fri Aug  8 16:23:43 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -37,6 +38,7 @@ import org.apache.qpid.server.model.Conf
 import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -79,6 +81,7 @@ public class PriorityQueueListTest exten
 
             when(message.getMessageHeader()).thenReturn(header);
             when(message.newReference()).thenReturn(ref);
+            when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
             when(ref.getMessage()).thenReturn(message);
             when(header.getPriority()).thenReturn(PRIORITIES[i]);
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Fri Aug  8 16:23:43 2014
@@ -18,6 +18,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -38,6 +39,7 @@ import org.apache.qpid.server.model.Brok
 import org.apache.qpid.server.model.ConfiguredObjectFactory;
 import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 /**
@@ -148,9 +150,11 @@ public abstract class QueueEntryImplTest
                    _queueEntry.isAcquired());
 
         assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
-        assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
-        assertFalse("Acquisition should not be able to be removed from the wrong consumer",_queueEntry.removeAcquisitionFromConsumer(consumer2));
-        assertTrue("Acquisition should be able to be removed once unlocked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+        assertTrue("Should be able to unlock locked queue entry", _queueEntry.unlockAcquisition());
+        assertFalse("Acquisition should not be able to be removed from the wrong consumer",
+                    _queueEntry.removeAcquisitionFromConsumer(consumer2));
+        assertTrue("Acquisition should be able to be removed once unlocked",
+                   _queueEntry.removeAcquisitionFromConsumer(consumer));
         assertTrue("Queue Entry should still be acquired", _queueEntry.isAcquired());
         assertFalse("Queue Entry should not be marked as acquired by a consumer", _queueEntry.acquiredByConsumer());
 
@@ -254,6 +258,7 @@ public abstract class QueueEntryImplTest
             final MessageReference reference = mock(MessageReference.class);
             when(reference.getMessage()).thenReturn(message);
             when(message.newReference()).thenReturn(reference);
+            when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
             QueueEntryImpl entry = (QueueEntryImpl) queueEntryList.add(message);
             entries[i] = entry;
         }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java Fri Aug  8 16:23:43 2014
@@ -19,14 +19,16 @@
  */
 package org.apache.qpid.server.queue;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import junit.framework.TestCase;
 
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.qpid.server.store.TransactionLogResource;
 
 /**
  * Abstract test class for QueueEntryList implementations.
@@ -96,6 +98,7 @@ public abstract class QueueEntryListTest
         AMQMessageHeader hdr = mock(AMQMessageHeader.class);
         when(ref.getMessage()).thenReturn(message);
         when(message.newReference()).thenReturn(ref);
+        when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
         when(message.getMessageHeader()).thenReturn(hdr);
         return message;
     }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java Fri Aug  8 16:23:43 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -33,6 +34,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -162,6 +164,7 @@ public class QueueMessageRecoveryTest ex
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(msg);
         when(msg.newReference()).thenReturn(ref);
+        when(msg.newReference(any(TransactionLogResource.class))).thenReturn(ref);
         when(msg.getStoredMessage()).thenReturn(mock(StoredMessage.class));
         return msg;
     }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java Fri Aug  8 16:23:43 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -35,6 +36,7 @@ import org.apache.qpid.server.model.Brok
 import org.apache.qpid.server.model.ConfiguredObjectFactory;
 import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
@@ -70,6 +72,7 @@ public class SimpleQueueEntryImplTest ex
         final MessageReference reference = mock(MessageReference.class);
         when(reference.getMessage()).thenReturn(message);
         when(message.newReference()).thenReturn(reference);
+        when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
         return (QueueEntryImpl) queueEntryList.add(message);
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Fri Aug  8 16:23:43 2014
@@ -19,6 +19,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -40,6 +41,7 @@ import org.apache.qpid.server.model.Conf
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class SortedQueueEntryListTest extends QueueEntryListTestBase
@@ -180,6 +182,7 @@ public class SortedQueueEntryListTest ex
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);
         when(message.newReference()).thenReturn(ref);
+        when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
         when(message.getMessageNumber()).thenReturn(id);
 
         return message;

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java Fri Aug  8 16:23:43 2014
@@ -19,6 +19,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.model.Conf
 import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class SortedQueueEntryTest extends QueueEntryImplTestBase
@@ -97,6 +99,7 @@ public class SortedQueueEntryTest extend
         final MessageReference reference = mock(MessageReference.class);
         when(reference.getMessage()).thenReturn(message);
         when(message.newReference()).thenReturn(reference);
+        when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
         return _queueEntryList.add(message);
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Fri Aug  8 16:23:43 2014
@@ -20,6 +20,7 @@
 */
 package org.apache.qpid.server.queue;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -36,6 +37,7 @@ import org.apache.qpid.server.model.Brok
 import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class StandardQueueEntryListTest extends QueueEntryListTestBase
@@ -73,6 +75,7 @@ public class StandardQueueEntryListTest 
             MessageReference ref = mock(MessageReference.class);
             when(ref.getMessage()).thenReturn(message);
             when(message.newReference()).thenReturn(ref);
+            when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
 
             final QueueEntry bleh = _sqel.add(message);
             assertNotNull("QE should not have been null", bleh);
@@ -163,6 +166,7 @@ public class StandardQueueEntryListTest 
             MessageReference ref = mock(MessageReference.class);
             when(ref.getMessage()).thenReturn(message);
             when(message.newReference()).thenReturn(ref);
+            when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
             QueueEntry bleh = sqel.add(message);
             assertNotNull("QE should not have been null", bleh);
             entriesMap.put(i,bleh);
@@ -264,6 +268,7 @@ public class StandardQueueEntryListTest 
             final MessageReference reference = mock(MessageReference.class);
             when(reference.getMessage()).thenReturn(message);
             when(message.newReference()).thenReturn(reference);
+            when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
             entries[i] = (OrderedQueueEntry) queueEntryList.add(message);
         }
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Fri Aug  8 16:23:43 2014
@@ -165,6 +165,24 @@ public class TestMessageMetaDataType imp
         }
 
         @Override
+        public MessageReference newReference(final TransactionLogResource object)
+        {
+            return _messageReference;
+        }
+
+        @Override
+        public boolean isReferenced(final TransactionLogResource resource)
+        {
+            return false;
+        }
+
+        @Override
+        public boolean isReferenced()
+        {
+            return false;
+        }
+
+        @Override
         public int hashCode()
         {
             final int prime = 31;

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Fri Aug  8 16:23:43 2014
@@ -20,14 +20,15 @@
  */
 package org.apache.qpid.server.txn;
 
+import java.nio.ByteBuffer;
+
 import org.apache.commons.lang.NotImplementedException;
 
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.StoredMessage;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.server.store.TransactionLogResource;
 
 /**
  * Mock Server Message allowing its persistent flag to be controlled from test.
@@ -57,6 +58,24 @@ class MockServerMessage implements Serve
         throw new NotImplementedException();
     }
 
+    @Override
+    public MessageReference newReference(final TransactionLogResource object)
+    {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean isReferenced(final TransactionLogResource resource)
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isReferenced()
+    {
+        return false;
+    }
+
     public boolean isImmediate()
     {
         throw new NotImplementedException();
@@ -113,4 +132,4 @@ class MockServerMessage implements Serve
     {
         return 0L;
     }
-}
\ No newline at end of file
+}

Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java Fri Aug  8 16:23:43 2014
@@ -33,6 +33,7 @@ import javax.servlet.http.HttpServletRes
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
+
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageDeletedException;
@@ -44,6 +45,7 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.queue.QueueEntryVisitor;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.access.Operation;
+import org.apache.qpid.server.store.TransactionLogResource;
 
 public class MessageServlet extends AbstractServlet
 {
@@ -212,7 +214,11 @@ public class MessageServlet extends Abst
         @Override
         protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn)
         {
-            txn.move(entry, _destinationQueue);
+            ServerMessage msg = entry.getMessage();
+            if(msg != null && !msg.isReferenced((TransactionLogResource)_destinationQueue))
+            {
+                txn.move(entry, _destinationQueue);
+            }
         }
     }
 
@@ -229,7 +235,11 @@ public class MessageServlet extends Abst
         @Override
         protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn)
         {
-            txn.copy(entry, _destinationQueue);
+            ServerMessage msg = entry.getMessage();
+            if(msg != null && !msg.isReferenced((TransactionLogResource)_destinationQueue))
+            {
+                txn.copy(entry, _destinationQueue);
+            }
         }
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java Fri Aug  8 16:23:43 2014
@@ -59,6 +59,7 @@ import org.apache.qpid.server.model.Virt
 import org.apache.qpid.server.queue.NotificationCheck;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
@@ -519,7 +520,8 @@ public class QueueMBean extends AMQManag
                             final long messageId = message.getMessageNumber();
 
                             if ((messageId >= fromMessageId)
-                                && (messageId <= toMessageId))
+                                && (messageId <= toMessageId)
+                                && !(message.isReferenced((TransactionLogResource)destinationQueue)))
                             {
                                 txn.move(entry, destinationQueue);
                             }
@@ -571,8 +573,8 @@ public class QueueMBean extends AMQManag
         }
 
         VirtualHost<?,?,?> vhost = _queue.getParent(VirtualHost.class);
-        final Queue<?> queue = vhost.getChildByName(Queue.class, toQueue);
-        if (queue == null)
+        final Queue<?> destinationQueue = vhost.getChildByName(Queue.class, toQueue);
+        if (destinationQueue == null)
         {
             throw new OperationsException("No such queue \""+ toQueue +"\"");
         }
@@ -591,9 +593,10 @@ public class QueueMBean extends AMQManag
                             final long messageId = message.getMessageNumber();
 
                             if ((messageId >= fromMessageId)
-                                && (messageId <= toMessageId))
+                                && (messageId <= toMessageId)
+                                && !(message.isReferenced((TransactionLogResource)destinationQueue)))
                             {
-                                txn.copy(entry, queue);
+                                txn.copy(entry, destinationQueue);
                             }
 
                         }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java Fri Aug  8 16:23:43 2014
@@ -456,6 +456,61 @@ public class QueueManagementTest extends
         assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8);
     }
 
+
+    /**
+     * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface.
+     */
+    public void testCopyMessagesBetweenQueuesWithDuplicates() throws Exception
+    {
+        final int numberOfMessagesToSend = 10;
+        sendMessage(_session, _sourceQueue, numberOfMessagesToSend);
+        syncSession(_session);
+        assertEquals("Unexpected queue depth after send",
+                     numberOfMessagesToSend,
+                     _managedSourceQueue.getMessageCount().intValue());
+
+        List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend);
+
+        // Copy first three messages to destination
+        long fromMessageId = amqMessagesIds.get(0);
+        long toMessageId = amqMessagesIds.get(2);
+        _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName);
+
+        assertEquals("Unexpected queue depth on destination queue after first copy",
+                     3,
+                     _managedDestinationQueue.getMessageCount().intValue());
+        assertEquals("Unexpected queue depth on source queue after first copy",
+                     numberOfMessagesToSend,
+                     _managedSourceQueue.getMessageCount().intValue());
+
+        // Now copy a further two messages to destination
+        fromMessageId = amqMessagesIds.get(7);
+        toMessageId = amqMessagesIds.get(8);
+        _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName);
+        assertEquals("Unexpected queue depth on destination queue after second copy",
+                     5,
+                     _managedDestinationQueue.getMessageCount().intValue());
+        assertEquals("Unexpected queue depth on source queue after second copy",
+                     numberOfMessagesToSend,
+                     _managedSourceQueue.getMessageCount().intValue());
+
+        // Attempt to copy mixture of messages already on and some not already on the queue
+
+        fromMessageId = amqMessagesIds.get(5);
+        toMessageId = amqMessagesIds.get(8);
+        _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName);
+        assertEquals("Unexpected queue depth on destination queue after second copy",
+                     7,
+                     _managedDestinationQueue.getMessageCount().intValue());
+        assertEquals("Unexpected queue depth on source queue after second copy",
+                     numberOfMessagesToSend,
+                     _managedSourceQueue.getMessageCount().intValue());
+
+        assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8, 5, 6);
+
+
+    }
+
     public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception
     {
         setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org