You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/08/08 18:23:44 UTC
svn commit: r1616813 - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/exchange/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/test/java/org/...
Author: rgodfrey
Date: Fri Aug 8 16:23:43 2014
New Revision: 1616813
URL: http://svn.apache.org/r1616813
Log:
QPID-4307 : [Java Broker] prevent the copying/moving of messages onto queues on which the message already exists
Added:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java
- copied, changed from r1616557, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Aug 8 16:23:43 2014
@@ -511,7 +511,7 @@ public abstract class AbstractExchange<T
Exchange altExchange = getAlternateExchange();
if(altExchange != null)
{
- return ((ExchangeImpl)altExchange).send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
+ return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction);
}
else
{
@@ -520,7 +520,24 @@ public abstract class AbstractExchange<T
}
else
{
- final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+ final BaseQueue[] baseQueues;
+
+ if(message.isReferenced())
+ {
+ ArrayList<BaseQueue> uniqueQueues = new ArrayList<>(queues.size());
+ for(BaseQueue q : queues)
+ {
+ if(!message.isReferenced(q))
+ {
+ uniqueQueues.add(q);
+ }
+ }
+ baseQueues = uniqueQueues.toArray(new BaseQueue[uniqueQueues.size()]);
+ }
+ else
+ {
+ baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+ }
txn.enqueue(queues,message, new ServerTransaction.Action()
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Fri Aug 8 16:23:43 2014
@@ -21,10 +21,17 @@
package org.apache.qpid.server.message;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
@@ -33,10 +40,14 @@ public abstract class AbstractServerMess
private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
+ private static final AtomicReferenceFieldUpdater<AbstractServerMessageImpl, Collection> _resourcesUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class, Collection.class,"_resources");
+
private volatile int _referenceCount = 0;
private final StoredMessage<T> _handle;
private final Object _connectionReference;
+ private volatile Collection<UUID> _resources;
public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
@@ -117,6 +128,26 @@ public abstract class AbstractServerMess
}
@Override
+ final public MessageReference<X> newReference(TransactionLogResource object)
+ {
+ return new Reference(this, object);
+ }
+
+ @Override
+ final public boolean isReferenced(TransactionLogResource resource)
+ {
+ Collection<UUID> resources = _resources;
+ return resources != null && resources.contains(resource.getId());
+ }
+
+ @Override
+ final public boolean isReferenced()
+ {
+ Collection<UUID> resources = _resources;
+ return resources != null && !resources.isEmpty();
+ }
+
+ @Override
final public boolean isPersistent()
{
return _handle.getMetaData().isPersistent();
@@ -156,15 +187,52 @@ public abstract class AbstractServerMess
AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released");
private AbstractServerMessageImpl<X, T> _message;
+ private final UUID _resourceId;
private volatile int _released;
private Reference(final AbstractServerMessageImpl<X, T> message)
{
+ this(message, null);
+ }
+ private Reference(final AbstractServerMessageImpl<X, T> message, TransactionLogResource resource)
+ {
_message = message;
+ if(resource != null)
+ {
+ Collection<UUID> currentValue;
+ Collection<UUID> newValue;
+ _resourceId = resource.getId();
+ do
+ {
+ currentValue = _message._resources;
+
+ if(currentValue == null)
+ {
+ newValue = Collections.singleton(_resourceId);
+ }
+ else
+ {
+ if(currentValue.contains(_resourceId))
+ {
+ throw new MessageAlreadyReferencedException(_message.getMessageNumber(), resource);
+ }
+ newValue = new ArrayList<>(currentValue.size()+1);
+ newValue.addAll(currentValue);
+ newValue.add(_resourceId);
+ }
+
+ }
+ while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue));
+ }
+ else
+ {
+ _resourceId = null;
+ }
if(!_message.incrementReference())
{
throw new MessageDeletedException(message.getMessageNumber());
}
+
}
public X getMessage()
@@ -176,6 +244,34 @@ public abstract class AbstractServerMess
{
if(_releasedUpdater.compareAndSet(this,0,1))
{
+ if(_resourceId != null)
+ {
+ Collection<UUID> currentValue;
+ Collection<UUID> newValue;
+ do
+ {
+ currentValue = _message._resources;
+ if(currentValue.size() == 1)
+ {
+ newValue = null;
+ }
+ else
+ {
+ UUID[] array = new UUID[currentValue.size()-1];
+ int pos = 0;
+ for(UUID uuid : currentValue)
+ {
+ if(!_resourceId.equals(uuid))
+ {
+ array[pos++] = uuid;
+ }
+ }
+ newValue = Arrays.asList(array);
+ }
+ }
+ while(!_resourcesUpdater.compareAndSet(_message, currentValue, newValue));
+
+ }
_message.decrementReference();
}
}
Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java (from r1616557, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java&p1=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java&r1=1616557&r2=1616813&rev=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDeletedException.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageAlreadyReferencedException.java Fri Aug 8 16:23:43 2014
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.server.message;
-public class MessageDeletedException extends RuntimeException
+import org.apache.qpid.server.store.TransactionLogResource;
+
+public class MessageAlreadyReferencedException extends RuntimeException
{
- MessageDeletedException(final long messageNumber)
+ MessageAlreadyReferencedException(final long messageNumber, TransactionLogResource resource)
{
- super("The message with id " + messageNumber + " has already been deleted, no new reference can be taken");
+ super("The message with id " + messageNumber + " is already referenced by resource " + resource.getName());
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java Fri Aug 8 16:23:43 2014
@@ -20,10 +20,11 @@
*/
package org.apache.qpid.server.message;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.server.store.TransactionLogResource;
public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueueableMessage, MessageContentSource
{
@@ -41,6 +42,12 @@ public interface ServerMessage<T extends
MessageReference newReference();
+ MessageReference newReference(TransactionLogResource object);
+
+ boolean isReferenced(TransactionLogResource resource);
+
+ boolean isReferenced();
+
long getMessageNumber();
long getArrivalTime();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Aug 8 16:23:43 2014
@@ -2549,7 +2549,9 @@ public abstract class AbstractQueue<X ex
final ServerTransaction txn,
final Action<? super MessageInstance> postEnqueueAction)
{
- txn.enqueue(this,message, new ServerTransaction.Action()
+ if(!message.isReferenced(this))
+ {
+ txn.enqueue(this, message, new ServerTransaction.Action()
{
MessageReference _reference = message.newReference();
@@ -2571,6 +2573,11 @@ public abstract class AbstractQueue<X ex
}
});
return 1;
+ }
+ else
+ {
+ return 0;
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Aug 8 16:23:43 2014
@@ -103,7 +103,7 @@ public abstract class QueueEntryImpl imp
{
_queueEntryList = queueEntryList;
- _message = message == null ? null : message.newReference();
+ _message = message == null ? null : message.newReference(queueEntryList.getQueue());
_entryIdUpdater.set(this, entryId);
populateInstanceProperties();
@@ -112,7 +112,7 @@ public abstract class QueueEntryImpl imp
public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
- _message = message == null ? null : message.newReference();
+ _message = message == null ? null : message.newReference(queueEntryList.getQueue());
populateInstanceProperties();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Fri Aug 8 16:23:43 2014
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.Se
public interface QueueEntryList
{
- AMQQueue getQueue();
+ AMQQueue<?> getQueue();
QueueEntry add(ServerMessage message);
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Fri Aug 8 16:23:43 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.exchange;
import static org.apache.qpid.common.AMQPFilterTypes.*;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -43,6 +44,7 @@ import org.apache.qpid.server.model.Queu
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.QueueExistsException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -497,6 +499,7 @@ public class TopicExchangeTest extends Q
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(message.getMessageNumber()).thenReturn(messageNumber);
for(BaseQueue q : queues)
{
Added: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java?rev=1616813&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java (added)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java Fri Aug 8 16:23:43 2014
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class AbstractServerMessageTest extends QpidTestCase
+{
+ private static class TestMessage<T extends StorableMessageMetaData> extends AbstractServerMessageImpl<TestMessage<T>,T>
+ {
+
+ public TestMessage(final StoredMessage<T> handle,
+ final Object connectionReference)
+ {
+ super(handle, connectionReference);
+ }
+
+ @Override
+ public String getInitialRoutingAddress()
+ {
+ return null;
+ }
+
+ @Override
+ public AMQMessageHeader getMessageHeader()
+ {
+ return null;
+ }
+
+ @Override
+ public long getSize()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getExpiration()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getArrivalTime()
+ {
+ return 0;
+ }
+ }
+
+ private TransactionLogResource createQueue(String name)
+ {
+ TransactionLogResource queue = mock(TransactionLogResource.class);
+ when(queue.getId()).thenReturn(UUID.randomUUID());
+ when(queue.getName()).thenReturn(name);
+ return queue;
+ }
+
+ public void testReferences()
+ {
+ TransactionLogResource q1 = createQueue("1");
+ TransactionLogResource q2 = createQueue("2");
+
+ TestMessage<StorableMessageMetaData> msg = new TestMessage<StorableMessageMetaData>(mock(StoredMessage.class),this);
+ assertFalse(msg.isReferenced());
+ assertFalse(msg.isReferenced(q1));
+
+ MessageReference<TestMessage<StorableMessageMetaData>> nonQueueRef = msg.newReference();
+ assertFalse(msg.isReferenced());
+ assertFalse(msg.isReferenced(q1));
+
+ MessageReference<TestMessage<StorableMessageMetaData>> q1ref = msg.newReference(q1);
+ assertTrue(msg.isReferenced());
+ assertTrue(msg.isReferenced(q1));
+ assertFalse(msg.isReferenced(q2));
+
+ q1ref.release();
+ assertFalse(msg.isReferenced());
+ assertFalse(msg.isReferenced(q1));
+
+ q1ref = msg.newReference(q1);
+ assertTrue(msg.isReferenced());
+ assertTrue(msg.isReferenced(q1));
+ assertFalse(msg.isReferenced(q2));
+
+ MessageReference<TestMessage<StorableMessageMetaData>> q2ref = msg.newReference(q2);
+ assertTrue(msg.isReferenced());
+ assertTrue(msg.isReferenced(q1));
+ assertTrue(msg.isReferenced(q2));
+
+ try
+ {
+ msg.newReference(q1);
+ fail("Should not be able to create a second reference to the same queue");
+ }
+ catch (MessageAlreadyReferencedException e)
+ {
+ // pass
+ }
+ q2ref.release();
+ assertTrue(msg.isReferenced());
+ assertTrue(msg.isReferenced(q1));
+ assertFalse(msg.isReferenced(q2));
+
+ q1ref.release();
+ assertFalse(msg.isReferenced());
+ assertFalse(msg.isReferenced(q1));
+
+ nonQueueRef.release();
+
+ try
+ {
+ msg.newReference(q1);
+ fail("Message should not allow new references as all references had been removed");
+ }
+ catch(MessageDeletedException e)
+ {
+ // pass
+ }
+
+ }
+}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Fri Aug 8 16:23:43 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.contains;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
@@ -60,6 +61,7 @@ import org.apache.qpid.server.model.Queu
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -1157,6 +1159,7 @@ abstract class AbstractQueueTestBase ext
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
return message;
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java Fri Aug 8 16:23:43 2014
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.model.Conf
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class LastValueQueueListTest extends TestCase
@@ -220,6 +222,8 @@ public class LastValueQueueListTest exte
MessageReference messageReference = mock(MessageReference.class);
when(mockMessage.newReference()).thenReturn(messageReference);
+ when(mockMessage.newReference(any(TransactionLogResource.class))).thenReturn(messageReference);
+
when(messageReference.getMessage()).thenReturn(mockMessage);
return mockMessage;
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java Fri Aug 8 16:23:43 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,6 +38,7 @@ import org.apache.qpid.server.model.Conf
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -79,6 +81,7 @@ public class PriorityQueueListTest exten
when(message.getMessageHeader()).thenReturn(header);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(ref.getMessage()).thenReturn(message);
when(header.getPriority()).thenReturn(PRIORITIES[i]);
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Fri Aug 8 16:23:43 2014
@@ -18,6 +18,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
/**
@@ -148,9 +150,11 @@ public abstract class QueueEntryImplTest
_queueEntry.isAcquired());
assertFalse("Acquisition should initially be locked",_queueEntry.removeAcquisitionFromConsumer(consumer));
- assertTrue("Should be able to unlock locked queue entry",_queueEntry.unlockAcquisition());
- assertFalse("Acquisition should not be able to be removed from the wrong consumer",_queueEntry.removeAcquisitionFromConsumer(consumer2));
- assertTrue("Acquisition should be able to be removed once unlocked",_queueEntry.removeAcquisitionFromConsumer(consumer));
+ assertTrue("Should be able to unlock locked queue entry", _queueEntry.unlockAcquisition());
+ assertFalse("Acquisition should not be able to be removed from the wrong consumer",
+ _queueEntry.removeAcquisitionFromConsumer(consumer2));
+ assertTrue("Acquisition should be able to be removed once unlocked",
+ _queueEntry.removeAcquisitionFromConsumer(consumer));
assertTrue("Queue Entry should still be acquired", _queueEntry.isAcquired());
assertFalse("Queue Entry should not be marked as acquired by a consumer", _queueEntry.acquiredByConsumer());
@@ -254,6 +258,7 @@ public abstract class QueueEntryImplTest
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
QueueEntryImpl entry = (QueueEntryImpl) queueEntryList.add(message);
entries[i] = entry;
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java Fri Aug 8 16:23:43 2014
@@ -19,14 +19,16 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import junit.framework.TestCase;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.qpid.server.store.TransactionLogResource;
/**
* Abstract test class for QueueEntryList implementations.
@@ -96,6 +98,7 @@ public abstract class QueueEntryListTest
AMQMessageHeader hdr = mock(AMQMessageHeader.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(message.getMessageHeader()).thenReturn(hdr);
return message;
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java Fri Aug 8 16:23:43 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -33,6 +34,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -162,6 +164,7 @@ public class QueueMessageRecoveryTest ex
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(msg);
when(msg.newReference()).thenReturn(ref);
+ when(msg.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(msg.getStoredMessage()).thenReturn(mock(StoredMessage.class));
return msg;
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java Fri Aug 8 16:23:43 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -35,6 +36,7 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
@@ -70,6 +72,7 @@ public class SimpleQueueEntryImplTest ex
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
return (QueueEntryImpl) queueEntryList.add(message);
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Fri Aug 8 16:23:43 2014
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -40,6 +41,7 @@ import org.apache.qpid.server.model.Conf
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class SortedQueueEntryListTest extends QueueEntryListTestBase
@@ -180,6 +182,7 @@ public class SortedQueueEntryListTest ex
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
when(message.getMessageNumber()).thenReturn(id);
return message;
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java Fri Aug 8 16:23:43 2014
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.model.Conf
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class SortedQueueEntryTest extends QueueEntryImplTestBase
@@ -97,6 +99,7 @@ public class SortedQueueEntryTest extend
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
return _queueEntryList.add(message);
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Fri Aug 8 16:23:43 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -36,6 +37,7 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class StandardQueueEntryListTest extends QueueEntryListTestBase
@@ -73,6 +75,7 @@ public class StandardQueueEntryListTest
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
final QueueEntry bleh = _sqel.add(message);
assertNotNull("QE should not have been null", bleh);
@@ -163,6 +166,7 @@ public class StandardQueueEntryListTest
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
QueueEntry bleh = sqel.add(message);
assertNotNull("QE should not have been null", bleh);
entriesMap.put(i,bleh);
@@ -264,6 +268,7 @@ public class StandardQueueEntryListTest
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
entries[i] = (OrderedQueueEntry) queueEntryList.add(message);
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Fri Aug 8 16:23:43 2014
@@ -165,6 +165,24 @@ public class TestMessageMetaDataType imp
}
@Override
+ public MessageReference newReference(final TransactionLogResource object)
+ {
+ return _messageReference;
+ }
+
+ @Override
+ public boolean isReferenced(final TransactionLogResource resource)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isReferenced()
+ {
+ return false;
+ }
+
+ @Override
public int hashCode()
{
final int prime = 31;
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Fri Aug 8 16:23:43 2014
@@ -20,14 +20,15 @@
*/
package org.apache.qpid.server.txn;
+import java.nio.ByteBuffer;
+
import org.apache.commons.lang.NotImplementedException;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.StoredMessage;
-
-import java.nio.ByteBuffer;
+import org.apache.qpid.server.store.TransactionLogResource;
/**
* Mock Server Message allowing its persistent flag to be controlled from test.
@@ -57,6 +58,24 @@ class MockServerMessage implements Serve
throw new NotImplementedException();
}
+ @Override
+ public MessageReference newReference(final TransactionLogResource object)
+ {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean isReferenced(final TransactionLogResource resource)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isReferenced()
+ {
+ return false;
+ }
+
public boolean isImmediate()
{
throw new NotImplementedException();
@@ -113,4 +132,4 @@ class MockServerMessage implements Serve
{
return 0L;
}
-}
\ No newline at end of file
+}
Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java Fri Aug 8 16:23:43 2014
@@ -33,6 +33,7 @@ import javax.servlet.http.HttpServletRes
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
+
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageDeletedException;
@@ -44,6 +45,7 @@ import org.apache.qpid.server.queue.Queu
import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
+import org.apache.qpid.server.store.TransactionLogResource;
public class MessageServlet extends AbstractServlet
{
@@ -212,7 +214,11 @@ public class MessageServlet extends Abst
@Override
protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn)
{
- txn.move(entry, _destinationQueue);
+ ServerMessage msg = entry.getMessage();
+ if(msg != null && !msg.isReferenced((TransactionLogResource)_destinationQueue))
+ {
+ txn.move(entry, _destinationQueue);
+ }
}
}
@@ -229,7 +235,11 @@ public class MessageServlet extends Abst
@Override
protected void updateEntry(QueueEntry entry, VirtualHost.Transaction txn)
{
- txn.copy(entry, _destinationQueue);
+ ServerMessage msg = entry.getMessage();
+ if(msg != null && !msg.isReferenced((TransactionLogResource)_destinationQueue))
+ {
+ txn.copy(entry, _destinationQueue);
+ }
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java Fri Aug 8 16:23:43 2014
@@ -59,6 +59,7 @@ import org.apache.qpid.server.model.Virt
import org.apache.qpid.server.queue.NotificationCheck;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
@@ -519,7 +520,8 @@ public class QueueMBean extends AMQManag
final long messageId = message.getMessageNumber();
if ((messageId >= fromMessageId)
- && (messageId <= toMessageId))
+ && (messageId <= toMessageId)
+ && !(message.isReferenced((TransactionLogResource)destinationQueue)))
{
txn.move(entry, destinationQueue);
}
@@ -571,8 +573,8 @@ public class QueueMBean extends AMQManag
}
VirtualHost<?,?,?> vhost = _queue.getParent(VirtualHost.class);
- final Queue<?> queue = vhost.getChildByName(Queue.class, toQueue);
- if (queue == null)
+ final Queue<?> destinationQueue = vhost.getChildByName(Queue.class, toQueue);
+ if (destinationQueue == null)
{
throw new OperationsException("No such queue \""+ toQueue +"\"");
}
@@ -591,9 +593,10 @@ public class QueueMBean extends AMQManag
final long messageId = message.getMessageNumber();
if ((messageId >= fromMessageId)
- && (messageId <= toMessageId))
+ && (messageId <= toMessageId)
+ && !(message.isReferenced((TransactionLogResource)destinationQueue)))
{
- txn.copy(entry, queue);
+ txn.copy(entry, destinationQueue);
}
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java?rev=1616813&r1=1616812&r2=1616813&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/QueueManagementTest.java Fri Aug 8 16:23:43 2014
@@ -456,6 +456,61 @@ public class QueueManagementTest extends
assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8);
}
+
+ /**
+ * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface.
+ */
+ public void testCopyMessagesBetweenQueuesWithDuplicates() throws Exception
+ {
+ final int numberOfMessagesToSend = 10;
+ sendMessage(_session, _sourceQueue, numberOfMessagesToSend);
+ syncSession(_session);
+ assertEquals("Unexpected queue depth after send",
+ numberOfMessagesToSend,
+ _managedSourceQueue.getMessageCount().intValue());
+
+ List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend);
+
+ // Copy first three messages to destination
+ long fromMessageId = amqMessagesIds.get(0);
+ long toMessageId = amqMessagesIds.get(2);
+ _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName);
+
+ assertEquals("Unexpected queue depth on destination queue after first copy",
+ 3,
+ _managedDestinationQueue.getMessageCount().intValue());
+ assertEquals("Unexpected queue depth on source queue after first copy",
+ numberOfMessagesToSend,
+ _managedSourceQueue.getMessageCount().intValue());
+
+ // Now copy a further two messages to destination
+ fromMessageId = amqMessagesIds.get(7);
+ toMessageId = amqMessagesIds.get(8);
+ _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName);
+ assertEquals("Unexpected queue depth on destination queue after second copy",
+ 5,
+ _managedDestinationQueue.getMessageCount().intValue());
+ assertEquals("Unexpected queue depth on source queue after second copy",
+ numberOfMessagesToSend,
+ _managedSourceQueue.getMessageCount().intValue());
+
+ // Attempt to copy mixture of messages already on and some not already on the queue
+
+ fromMessageId = amqMessagesIds.get(5);
+ toMessageId = amqMessagesIds.get(8);
+ _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName);
+ assertEquals("Unexpected queue depth on destination queue after second copy",
+ 7,
+ _managedDestinationQueue.getMessageCount().intValue());
+ assertEquals("Unexpected queue depth on source queue after second copy",
+ numberOfMessagesToSend,
+ _managedSourceQueue.getMessageCount().intValue());
+
+ assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8, 5, 6);
+
+
+ }
+
public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception
{
setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org