You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/02/12 14:27:57 UTC
svn commit: r1567616 [10/12] - in /qpid/branches/java-broker-bdb-ha: ./
qpid/ qpid/cpp/bindings/qmf2/ruby/ qpid/cpp/bindings/qpid/examples/perl/
qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/lib/qpid/messaging/
qpid/cpp/bindings/qpid/ruby/ q...
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Wed Feb 12 13:27:51 2014
@@ -28,11 +28,11 @@ import org.apache.qpid.framing.ContentHe
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.flow.LimitlessCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.test.utils.QpidTestCase;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.Set;
/**
@@ -47,7 +48,8 @@ import java.util.Set;
*/
public class AckTest extends QpidTestCase
{
- private Subscription _subscription;
+ private ConsumerTarget_0_8 _subscriptionTarget;
+ private Consumer _consumer;
private AMQProtocolSession _protocolSession;
@@ -86,7 +88,6 @@ public class AckTest extends QpidTestCas
private void publishMessages(int count, boolean persistent) throws AMQException
{
- _queue.registerSubscription(_subscription,false);
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -144,7 +145,7 @@ public class AckTest extends QpidTestCas
try
{
- _queue.enqueue(message);
+ _queue.enqueue(message,null);
}
catch (AMQException e)
{
@@ -178,7 +179,13 @@ public class AckTest extends QpidTestCas
*/
public void testAckChannelAssociationTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -190,8 +197,8 @@ public class AckTest extends QpidTestCas
{
assertTrue(deliveryTag == i);
i++;
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
}
}
@@ -202,7 +209,16 @@ public class AckTest extends QpidTestCas
public void testNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
+ _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget,
+ null,
+ AMQMessage.class,
+ DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -218,7 +234,13 @@ public class AckTest extends QpidTestCas
public void testPersistentNoAckMode() throws AMQException
{
// false arg means no acks expected
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES));
final int msgCount = 10;
publishMessages(msgCount, true);
@@ -235,7 +257,15 @@ public class AckTest extends QpidTestCas
*/
public void testSingleAckReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -248,8 +278,8 @@ public class AckTest extends QpidTestCas
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
// 5 is the delivery tag of the message that *should* be removed
if (++i == 5)
{
@@ -264,7 +294,15 @@ public class AckTest extends QpidTestCas
*/
public void testMultiAckReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -279,8 +317,8 @@ public class AckTest extends QpidTestCas
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i + 5);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
++i;
}
}
@@ -290,7 +328,15 @@ public class AckTest extends QpidTestCas
*/
public void testMultiAckAllReceivedTest() throws AMQException
{
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+ DEFAULT_CONSUMER_TAG,
+ null,
+ new LimitlessCreditManager());
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
+
final int msgCount = 10;
publishMessages(msgCount);
@@ -303,8 +349,8 @@ public class AckTest extends QpidTestCas
for (long deliveryTag : deliveryTagSet)
{
assertTrue(deliveryTag == i + 5);
- QueueEntry unackedMsg = map.get(deliveryTag);
- assertTrue(unackedMsg.getQueue() == _queue);
+ MessageInstance unackedMsg = map.get(deliveryTag);
+ assertTrue(unackedMsg.getOwningResource() == _queue);
++i;
}
}
@@ -319,12 +365,16 @@ public class AckTest extends QpidTestCas
// Send 10 messages
Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
- _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
- DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+
+ _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
+ _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
+
final int msgCount = 1;
publishMessages(msgCount);
- _queue.deliverAsync(_subscription);
+ _consumer.externalStateChange();
_channel.acknowledgeMessage(1, false);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Wed Feb 12 13:27:51 2014
@@ -24,7 +24,7 @@ package org.apache.qpid.server.protocol.
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -36,7 +36,7 @@ import java.util.List;
public class AcknowledgeTest extends QpidTestCase
{
private AMQChannel _channel;
- private SimpleAMQQueue _queue;
+ private AMQQueue _queue;
private MessageStore _messageStore;
private String _queueName;
@@ -79,7 +79,7 @@ public class AcknowledgeTest extends Qpi
return (InternalTestProtocolSession)_channel.getProtocolSession();
}
- private SimpleAMQQueue getQueue()
+ private AMQQueue getQueue()
{
return _queue;
}
@@ -140,7 +140,7 @@ public class AcknowledgeTest extends Qpi
assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
//Subscribe to the queue
- AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true);
+ AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true, false);
getQueue().deliverAsync();
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Wed Feb 12 13:27:51 2014
@@ -23,20 +23,22 @@ package org.apache.qpid.server.protocol.
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.queue.SimpleQueueEntryList;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.subscription.MockSubscription;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
*
@@ -59,40 +61,50 @@ public class ExtractResendAndRequeueTest
private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
private static final int INITIAL_MSG_COUNT = 10;
- private AMQQueue _queue = new MockAMQQueue(getName());
- private MessageStore _messageStore = new TestMemoryMessageStore();
- private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
+ private AMQQueue _queue;
+ private LinkedList<MessageInstance> _referenceList = new LinkedList<MessageInstance>();
+ private Consumer _consumer;
+ private boolean _queueDeleted;
@Override
public void setUp() throws AMQException
{
+ _queueDeleted = false;
_unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100);
+ _queue = mock(AMQQueue.class);
+ when(_queue.getName()).thenReturn(getName());
+ when(_queue.isDeleted()).thenReturn(_queueDeleted);
+ _consumer = mock(Consumer.class);
+ when(_consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement());
+
long id = 0;
- SimpleQueueEntryList list = new SimpleQueueEntryList(_queue);
// Add initial messages to QueueEntryList
for (int count = 0; count < INITIAL_MSG_COUNT; count++)
{
- AMQMessage msg = new MockAMQMessage(id);
-
- list.add(msg);
+ ServerMessage msg = mock(ServerMessage.class);
+ when(msg.getMessageNumber()).thenReturn(id);
+ final QueueEntry entry = mock(QueueEntry.class);
+ when(entry.getMessage()).thenReturn(msg);
+ when(entry.getQueue()).thenReturn(_queue);
+ when(entry.isQueueDeleted()).thenReturn(_queueDeleted);
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws Throwable
+ {
+ when(entry.isDeleted()).thenReturn(true);
+ return null;
+ }
+ }).when(entry).delete();
+ _unacknowledgedMessageMap.add(id, entry);
+ _referenceList.add(entry);
//Increment ID;
id++;
}
- // Iterate through the QueueEntryList and add entries to unacknowledgedMessageMap and referenceList
- QueueEntryIterator queueEntries = list.iterator();
- while(queueEntries.advance())
- {
- QueueEntry entry = queueEntries.getNode();
- _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
-
- // Store the entry for future inspection
- _referenceList.add(entry);
- }
-
assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size());
}
@@ -103,17 +115,14 @@ public class ExtractResendAndRequeueTest
*
* @return Subscription that performed the acquire
*/
- private Subscription createSubscriptionAndAcquireMessages(LinkedList<QueueEntry> messageList)
+ private void acquireMessages(LinkedList<MessageInstance> messageList)
{
- Subscription subscription = new MockSubscription();
- // Aquire messages in subscription
- for (QueueEntry entry : messageList)
+ // Acquire messages in subscription
+ for(MessageInstance entry : messageList)
{
- entry.acquire(subscription);
+ when(entry.getDeliveredConsumer()).thenReturn(_consumer);
}
-
- return subscription;
}
/**
@@ -128,14 +137,14 @@ public class ExtractResendAndRequeueTest
public void testResend() throws AMQException
{
//We don't need the subscription object here.
- createSubscriptionAndAcquireMessages(_referenceList);
+ acquireMessages(_referenceList);
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+ final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
// requeueIfUnableToResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
+ msgToResend));
assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -154,100 +163,22 @@ public class ExtractResendAndRequeueTest
*/
public void testRequeueDueToSubscriptionClosure() throws AMQException
{
- Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList);
+ acquireMessages(_referenceList);
// Close subscription
- subscription.close();
+ when(_consumer.isClosed()).thenReturn(true);
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+ final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
// requeueIfUnableToResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
- }
-
- /**
- * If the subscription is null, due to message being retrieved via a GET, And we request that messages are requeued
- * requeueIfUnableToResend(set to true) then all messages should be sent to the msgToRequeue map.
- *
- * @throws AMQException the visit interface throws this
- */
-
- public void testRequeueDueToMessageHavingNoConsumerTag() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- // requeueIfUnableToResend = true so all messages should go to msgToRequeue
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, true, _messageStore));
+ msgToResend));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
}
- /**
- * If the subscription is null, due to message being retrieved via a GET, And we request that we don't
- * requeueIfUnableToResend(set to false) then all messages should be dropped as we do not have a dead letter queue.
- *
- * @throws AMQException the visit interface throws this
- */
-
- public void testDrop() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- // requeueIfUnableToResend = false so all messages should be dropped all maps should be empty
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
-
- for (QueueEntry entry : _referenceList)
- {
- assertTrue("Message was not discarded", entry.isDeleted());
- }
-
- }
-
- /**
- * If the subscription is null, due to message being retrieved via a GET, AND the queue upon which the message was
- * delivered has been deleted then it is not possible to requeue. Currently we simply discard the message but in the
- * future we may wish to dead letter the message.
- *
- * Validate that at the end of the visit all Maps are empty and all messages are marked as deleted
- *
- * @throws AMQException the visit interface throws this
- */
- public void testDiscard() throws AMQException
- {
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
- _queue.delete();
-
- // requeueIfUnableToResend : value doesn't matter here as queue has been deleted
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, false, _messageStore));
-
- assertEquals("Message count for resend not correct.", 0, msgToResend.size());
- assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
- assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
- for (QueueEntry entry : _referenceList)
- {
- assertTrue("Message was not discarded", entry.isDeleted());
- }
- }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Wed Feb 12 13:27:51 2014
@@ -47,11 +47,9 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -60,7 +58,7 @@ public class InternalTestProtocolSession
{
private static final Logger _logger = Logger.getLogger(InternalTestProtocolSession.class);
// ChannelID(LIST) -> LinkedList<Pair>
- private final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
+ private final Map<Integer, Map<String, LinkedList<DeliveryPair>>> _channelDelivers;
private AtomicInteger _deliveryCount = new AtomicInteger(0);
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
@@ -68,7 +66,7 @@ public class InternalTestProtocolSession
{
super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null);
- _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
+ _channelDelivers = new HashMap<Integer, Map<String, LinkedList<DeliveryPair>>>();
setTestAuthorizedSubject();
setVirtualHost(virtualHost);
@@ -117,7 +115,7 @@ public class InternalTestProtocolSession
{
synchronized (_channelDelivers)
{
- List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
+ List<DeliveryPair> all =_channelDelivers.get(channelId).get(AMQShortString.toString(consumerTag));
if (all == null)
{
@@ -153,23 +151,23 @@ public class InternalTestProtocolSession
synchronized (_channelDelivers)
{
- Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
+ Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
if (consumers == null)
{
- consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ consumers = new HashMap<String, LinkedList<DeliveryPair>>();
_channelDelivers.put(channelId, consumers);
}
- LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag);
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(AMQShortString.toString(consumerTag));
if (consumerDelivers == null)
{
consumerDelivers = new LinkedList<DeliveryPair>();
- consumers.put(consumerTag, consumerDelivers);
+ consumers.put(consumerTag.toString(), consumerDelivers);
}
- consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)msg));
+ consumerDelivers.add(new DeliveryPair(deliveryTag, msg));
}
}
@@ -247,27 +245,27 @@ public class InternalTestProtocolSession
@Override
- public void deliverToClient(Subscription sub, ServerMessage message,
+ public void deliverToClient(Consumer sub, ServerMessage message,
InstanceProperties props, long deliveryTag) throws AMQException
{
_deliveryCount.incrementAndGet();
synchronized (_channelDelivers)
{
- Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
+ Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
if (consumers == null)
{
- consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ consumers = new HashMap<String, LinkedList<DeliveryPair>>();
_channelDelivers.put(_channelId, consumers);
}
- LinkedList<DeliveryPair> consumerDelivers = consumers.get(((SubscriptionImpl)sub).getConsumerTag());
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getName());
if (consumerDelivers == null)
{
consumerDelivers = new LinkedList<DeliveryPair>();
- consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers);
+ consumers.put(sub.getName(), consumerDelivers);
}
consumerDelivers.add(new DeliveryPair(deliveryTag, message));
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Wed Feb 12 13:27:51 2014
@@ -26,10 +26,8 @@ import org.apache.qpid.exchange.Exchange
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -39,7 +37,7 @@ import java.util.List;
public class QueueBrowserUsesNoAckTest extends QpidTestCase
{
private AMQChannel _channel;
- private SimpleAMQQueue _queue;
+ private AMQQueue _queue;
private MessageStore _messageStore;
private String _queueName;
@@ -82,7 +80,7 @@ public class QueueBrowserUsesNoAckTest e
return (InternalTestProtocolSession)_channel.getProtocolSession();
}
- private SimpleAMQQueue getQueue()
+ private AMQQueue getQueue()
{
return _queue;
}
@@ -130,8 +128,7 @@ public class QueueBrowserUsesNoAckTest e
//Check the process didn't suspend the subscription as this would
// indicate we are using the prefetch credit. i.e. using acks not No-Ack
assertTrue("The subscription has been suspended",
- !getChannel().getSubscription(browser).getState()
- .equals(Subscription.State.SUSPENDED));
+ !getChannel().getSubscription(browser).isSuspended());
}
private void checkStoreContents(int messageCount)
@@ -144,6 +141,6 @@ public class QueueBrowserUsesNoAckTest e
FieldTable filters = new FieldTable();
filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
- return channel.subscribeToQueue(null, queue, true, filters, false, true);
+ return channel.consumeFromSource(null, queue, true, filters, true, false);
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,36 +16,37 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
<parent>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-project</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <artifactId>qpid-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
- <modelVersion>4.0.0</modelVersion>
<artifactId>qpid-broker-plugins-amqp-1-0-protocol</artifactId>
+ <version>0.28-SNAPSHOT</version>
+ <name>Qpid AMQP 1-0 Protocol Broker Plug-in</name>
+ <description>AMQP 1-0 protocol broker plug-in</description>
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-common</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
- <version>${log4j-version}</version>
- <scope>compile</scope>
</dependency>
</dependencies>
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
('svn:mergeinfo' removed)
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Wed Feb 12 13:27:51 2014
@@ -34,6 +34,7 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
@@ -53,16 +54,8 @@ public class Connection_1_0 implements C
private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
private final Object _reference = new Object();
-
-
- public static interface Task
- {
- public void doTask(Connection_1_0 connection);
- }
-
-
- private List<Task> _closeTasks =
- Collections.synchronizedList(new ArrayList<Task>());
+ private List<Action<Connection_1_0>> _closeTasks =
+ Collections.synchronizedList(new ArrayList<Action<Connection_1_0>>());
@@ -98,26 +91,26 @@ public class Connection_1_0 implements C
_sessions.remove(session);
}
- void removeConnectionCloseTask(final Task task)
+ void removeConnectionCloseTask(final Action<Connection_1_0> task)
{
_closeTasks.remove( task );
}
- void addConnectionCloseTask(final Task task)
+ void addConnectionCloseTask(final Action<Connection_1_0> task)
{
_closeTasks.add( task );
}
public void closeReceived()
{
- List<Task> taskCopy;
+ List<Action<Connection_1_0>> taskCopy;
synchronized (_closeTasks)
{
- taskCopy = new ArrayList<Task>(_closeTasks);
+ taskCopy = new ArrayList<Action<Connection_1_0>>(_closeTasks);
}
- for(Task task : taskCopy)
+ for(Action<Connection_1_0> task : taskCopy)
{
- task.doTask(this);
+ task.performAction(this);
}
synchronized (_closeTasks)
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Wed Feb 12 13:27:51 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ListIterator;
@@ -157,7 +156,7 @@ public abstract class MessageConverter_t
}
}
- private static Map fixMapValues(final Map<String, Object> map)
+ static Map fixMapValues(final Map<String, Object> map)
{
for(Map.Entry<String,Object> entry : map.entrySet())
{
@@ -166,7 +165,7 @@ public abstract class MessageConverter_t
return map;
}
- private static Object fixValue(final Object value)
+ static Object fixValue(final Object value)
{
if(value instanceof byte[])
{
@@ -186,7 +185,7 @@ public abstract class MessageConverter_t
}
}
- private static List fixListValues(final List list)
+ static List fixListValues(final List list)
{
ListIterator iterator = list.listIterator();
while(iterator.hasNext())
@@ -199,83 +198,88 @@ public abstract class MessageConverter_t
}
private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData,
- final ServerMessage serverMessage,
+ final M serverMessage,
SectionEncoder sectionEncoder)
{
- final String mimeType = serverMessage.getMessageHeader().getMimeType();
- byte[] data = new byte[(int) serverMessage.getSize()];
- serverMessage.getContent(ByteBuffer.wrap(data), 0);
+ final String mimeType = serverMessage.getMessageHeader().getMimeType();
+ Section bodySection = getBodySection(serverMessage, mimeType);
- Section bodySection = convertMessageBody(mimeType, data);
+ final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
- final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
-
- return new StoredMessage<MessageMetaData_1_0>()
- {
- @Override
- public MessageMetaData_1_0 getMetaData()
- {
- return metaData;
- }
-
- @Override
- public long getMessageNumber()
- {
- return serverMessage.getMessageNumber();
- }
-
- @Override
- public void addContent(int offsetInMessage, ByteBuffer src)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getContent(int offsetInMessage, ByteBuffer dst)
- {
- ByteBuffer buf = allData.duplicate();
- buf.position(offsetInMessage);
- buf = buf.slice();
- int size;
- if(dst.remaining()<buf.remaining())
- {
- buf.limit(dst.remaining());
- size = dst.remaining();
- }
- else
+ return new StoredMessage<MessageMetaData_1_0>()
{
- size = buf.remaining();
- }
- dst.put(buf);
- return size;
- }
-
- @Override
- public ByteBuffer getContent(int offsetInMessage, int size)
- {
- ByteBuffer buf = allData.duplicate();
- buf.position(offsetInMessage);
- buf = buf.slice();
- if(size < buf.remaining())
- {
- buf.limit(size);
- }
- return buf;
- }
+ @Override
+ public MessageMetaData_1_0 getMetaData()
+ {
+ return metaData;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return serverMessage.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ ByteBuffer buf = allData.duplicate();
+ buf.position(offsetInMessage);
+ buf = buf.slice();
+ int size;
+ if(dst.remaining()<buf.remaining())
+ {
+ buf.limit(dst.remaining());
+ size = dst.remaining();
+ }
+ else
+ {
+ size = buf.remaining();
+ }
+ dst.put(buf);
+ return size;
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = allData.duplicate();
+ buf.position(offsetInMessage);
+ buf = buf.slice();
+ if(size < buf.remaining())
+ {
+ buf.limit(size);
+ }
+ return buf;
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
- @Override
- public StoreFuture flushToStore()
- {
- throw new UnsupportedOperationException();
- }
+ protected Section getBodySection(final M serverMessage, final String mimeType)
+ {
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getContent(ByteBuffer.wrap(data), 0);
- @Override
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
+ return convertMessageBody(mimeType, data);
+ }
private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder)
{
@@ -286,7 +290,7 @@ public abstract class MessageConverter_t
Binary dataEncoding = sectionEncoder.getEncoding();
final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength());
- metaData.writeToBuffer(0,allData);
+ metaData.writeToBuffer(allData);
allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength());
return allData;
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Wed Feb 12 13:27:51 2014
@@ -314,7 +314,7 @@ public class MessageMetaData_1_0 impleme
return buf;
}
- public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+ public int writeToBuffer(ByteBuffer dest)
{
ByteBuffer buf = _encoded;
@@ -326,7 +326,7 @@ public class MessageMetaData_1_0 impleme
buf = buf.duplicate();
- buf.position(offsetInMetaData);
+ buf.position(0);
if(dest.remaining() < buf.limit())
{
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
('svn:mergeinfo' removed)
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
('svn:mergeinfo' removed)
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Wed Feb 12 13:27:51 2014
@@ -24,22 +24,21 @@ import org.apache.log4j.Logger;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.ServerTransaction;
-public class QueueDestination implements SendingDestination, ReceivingDestination
+public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination
{
private static final Logger _logger = Logger.getLogger(QueueDestination.class);
private static final Accepted ACCEPTED = new Accepted();
private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
- private AMQQueue _queue;
-
public QueueDestination(AMQQueue queue)
{
- _queue = queue;
+ super(queue);
}
public Outcome[] getOutcomes()
@@ -52,7 +51,7 @@ public class QueueDestination implements
try
{
- txn.enqueue(_queue,message, new ServerTransaction.Action()
+ txn.enqueue(getQueue(),message, new ServerTransaction.Action()
{
@@ -60,8 +59,7 @@ public class QueueDestination implements
{
try
{
-
- _queue.enqueue(message);
+ getQueue().enqueue(message,null);
}
catch (Exception e)
{
@@ -93,7 +91,7 @@ public class QueueDestination implements
public AMQQueue getQueue()
{
- return _queue;
+ return (AMQQueue) super.getQueue();
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Wed Feb 12 13:27:51 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,16 +43,7 @@ import org.apache.qpid.amqp_1_0.type.Del
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Modified;
-import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Released;
-import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
-import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.Error;
@@ -64,11 +56,14 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
@@ -78,18 +73,22 @@ public class SendingLink_1_0 implements
private VirtualHost _vhost;
private SendingDestination _destination;
- private Subscription_1_0 _subscription;
+ private Consumer _consumer;
+ private ConsumerTarget_1_0 _target;
+
private boolean _draining;
- private final Map<Binary, QueueEntry> _unsettledMap =
- new HashMap<Binary, QueueEntry>();
+ private final Map<Binary, MessageInstance> _unsettledMap =
+ new HashMap<Binary, MessageInstance>();
private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap =
new ConcurrentHashMap<Binary, UnsettledAction>();
private volatile SendingLinkAttachment _linkAttachment;
private TerminusDurability _durability;
- private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>();
+ private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>();
private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
private Runnable _closeAction;
+ private final MessageSource _queue;
+
public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
final VirtualHost vhost,
@@ -103,24 +102,22 @@ public class SendingLink_1_0 implements
_durability = source.getDurable();
linkAttachment.setDeliveryStateHandler(this);
QueueDestination qd = null;
- AMQQueue queue = null;
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
boolean noLocal = false;
JMSSelectorFilter messageFilter = null;
- if(destination instanceof QueueDestination)
+ if(destination instanceof MessageSourceDestination)
{
- queue = ((QueueDestination) _destination).getQueue();
+ _queue = ((MessageSourceDestination) _destination).getQueue();
- if(queue.getAvailableAttributes().contains("topic"))
+ if(_queue instanceof AMQQueue && ((AMQQueue)_queue).getAvailableAttributes().contains("topic"))
{
source.setDistributionMode(StdDistMode.COPY);
}
- qd = (QueueDestination) destination;
-
Map<Symbol,Filter> filters = source.getFilter();
Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
@@ -167,7 +164,13 @@ public class SendingLink_1_0 implements
}
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY);
+ _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
+ if(source.getDistributionMode() != StdDistMode.COPY)
+ {
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
+ }
+
}
else if(destination instanceof ExchangeDestination)
{
@@ -199,7 +202,7 @@ public class SendingLink_1_0 implements
name = UUID.randomUUID().toString();
}
- queue = _vhost.getQueue(name);
+ AMQQueue queue = _vhost.getQueue(name);
Exchange exchange = exchangeDestination.getExchange();
if(queue == null)
@@ -299,9 +302,10 @@ public class SendingLink_1_0 implements
}
}
}
+ _queue = queue;
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
- exchange.addBinding(binding,queue,null);
+ exchange.addBinding(binding, queue,null);
source.setDistributionMode(StdDistMode.COPY);
if(!isDurable)
@@ -309,10 +313,10 @@ public class SendingLink_1_0 implements
final String queueName = name;
final AMQQueue tempQueue = queue;
- final Connection_1_0.Task deleteQueueTask =
- new Connection_1_0.Task()
+ final Action<Connection_1_0> deleteQueueTask =
+ new Action<Connection_1_0>()
{
- public void doTask(Connection_1_0 session)
+ public void performAction(Connection_1_0 session)
{
if (_vhost.getQueue(queueName) == tempQueue)
{
@@ -331,9 +335,9 @@ public class SendingLink_1_0 implements
getSession().getConnection().addConnectionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue)
+ public void performAction(AMQQueue queue)
{
getSession().getConnection().removeConnectionCloseTask(deleteQueueTask);
}
@@ -347,31 +351,52 @@ public class SendingLink_1_0 implements
catch (AMQSecurityException e)
{
_logger.error("Security error", e);
+ throw new RuntimeException(e);
}
catch (AMQInternalException e)
{
_logger.error("Internal error", e);
+ throw new RuntimeException(e);
}
catch (AMQException e)
{
_logger.error("Error", e);
+ throw new RuntimeException(e);
}
- _subscription = new Subscription_1_0(this, qd, true);
+
+ _target = new ConsumerTarget_1_0(this, true);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
+
+ }
+ else
+ {
+ throw new RuntimeException("Unknown destination type");
}
- if(_subscription != null)
+ if(_target != null)
{
- _subscription.setNoLocal(noLocal);
- if(messageFilter!=null)
+ if(noLocal)
{
- _subscription.setFilters(new SimpleFilterManager(messageFilter));
+ options.add(Consumer.Option.NO_LOCAL);
}
try
{
-
- queue.registerSubscription(_subscription, false);
+ final String name;
+ if(getEndpoint().getTarget() instanceof Target)
+ {
+ Target target = (Target) getEndpoint().getTarget();
+ name = target.getAddress() == null ? getEndpoint().getName() : target.getAddress();
+ }
+ else
+ {
+ name = getEndpoint().getName();
+ }
+ _consumer = _queue.addConsumer(_target,
+ messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+ Message_1_0.class, name, options);
}
catch (AMQException e)
{
@@ -394,12 +419,11 @@ public class SendingLink_1_0 implements
// if not durable or close
if(!TerminusDurability.UNSETTLED_STATE.equals(_durability))
{
- AMQQueue queue = _subscription.getQueue();
try
{
- queue.unregisterSubscription(_subscription);
+ _consumer.close();
}
catch (AMQException e)
@@ -426,7 +450,7 @@ public class SendingLink_1_0 implements
{
try
{
- queue.getVirtualHost().removeQueue(queue);
+ _vhost.removeQueue((AMQQueue)_queue);
}
catch(AMQException e)
{
@@ -443,7 +467,7 @@ public class SendingLink_1_0 implements
else if(detach == null || detach.getError() != null)
{
_linkAttachment = null;
- _subscription.flowStateChanged();
+ _target.flowStateChanged();
}
else
{
@@ -491,7 +515,7 @@ public class SendingLink_1_0 implements
}
if(_resumeAcceptedTransfers.isEmpty())
{
- _subscription.flowStateChanged();
+ _target.flowStateChanged();
}
}
@@ -531,7 +555,7 @@ public class SendingLink_1_0 implements
}
}
- public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry)
+ public void addUnsettled(Binary tag, UnsettledAction unsettledAction, MessageInstance queueEntry)
{
_unsettledActionMap.put(tag,unsettledAction);
if(getTransactionId() == null)
@@ -593,9 +617,9 @@ public class SendingLink_1_0 implements
public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
{
- if(_subscription.isActive())
+ if(_consumer.isActive())
{
- _subscription.suspend();
+ _target.suspend();
}
_linkAttachment = linkAttachment;
@@ -603,14 +627,14 @@ public class SendingLink_1_0 implements
SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
endpoint.setDeliveryStateHandler(this);
Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
- Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap);
+ Map<Binary, MessageInstance> unsettledCopy = new HashMap<Binary, MessageInstance>(_unsettledMap);
_resumeAcceptedTransfers.clear();
_resumeFullTransfers.clear();
- for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet())
+ for(Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
{
Binary deliveryTag = entry.getKey();
- final QueueEntry queueEntry = entry.getValue();
+ final MessageInstance queueEntry = entry.getValue();
if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
{
queueEntry.setRedelivered();
@@ -624,7 +648,7 @@ public class SendingLink_1_0 implements
if(outcome instanceof Accepted)
{
AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
- if(_subscription.acquires())
+ if(_consumer.acquires())
{
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
@@ -644,7 +668,7 @@ public class SendingLink_1_0 implements
else if(outcome instanceof Released)
{
AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
- if(_subscription.acquires())
+ if(_consumer.acquires())
{
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
@@ -678,9 +702,9 @@ public class SendingLink_1_0 implements
public Map getUnsettledOutcomeMap()
{
- Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap);
+ Map<Binary, MessageInstance> unsettled = new HashMap<Binary, MessageInstance>(_unsettledMap);
- for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet())
+ for(Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet())
{
entry.setValue(null);
}
@@ -692,4 +716,9 @@ public class SendingLink_1_0 implements
{
_closeAction = action;
}
+
+ public VirtualHost getVirtualHost()
+ {
+ return _vhost;
+ }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Wed Feb 12 13:27:51 2014
@@ -41,6 +41,8 @@ import org.apache.qpid.AMQSecurityExcept
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -48,6 +50,7 @@ import org.apache.qpid.server.protocol.L
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.*;
@@ -108,11 +111,11 @@ public class Session_1_0 implements Sess
source.setAddress(tempQueue.getName());
}
String addr = source.getAddress();
- AMQQueue queue = _vhost.getQueue(addr);
+ MessageSource queue = _vhost.getMessageSource(addr);
if(queue != null)
{
- destination = new QueueDestination(queue);
+ destination = new MessageSourceDestination(queue);
@@ -249,11 +252,11 @@ public class Session_1_0 implements Sess
}
String addr = target.getAddress();
- Exchange exchg = _vhost.getExchange(addr);
- if(exchg != null)
+ MessageDestination messageDestination = _vhost.getMessageDestination(addr);
+ if(messageDestination != null)
{
- destination = new ExchangeDestination(exchg, target.getDurable(),
- target.getExpiryPolicy());
+ destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
+ target.getExpiryPolicy());
}
else
{
@@ -343,10 +346,10 @@ public class Session_1_0 implements Sess
if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
{
- final Connection_1_0.Task deleteQueueTask =
- new Connection_1_0.Task()
+ final Action<Connection_1_0> deleteQueueTask =
+ new Action<Connection_1_0>()
{
- public void doTask(Connection_1_0 session)
+ public void performAction(Connection_1_0 session)
{
if (_vhost.getQueue(queueName) == tempQueue)
{
@@ -365,9 +368,9 @@ public class Session_1_0 implements Sess
_connection.addConnectionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue)
+ public void performAction(AMQQueue queue)
{
_connection.removeConnectionCloseTask(deleteQueueTask);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,50 +16,50 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
<parent>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-project</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <artifactId>qpid-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
- <modelVersion>4.0.0</modelVersion>
<artifactId>qpid-broker-plugins-amqp-msg-conv-0-10-to-1-0</artifactId>
+ <version>0.28-SNAPSHOT</version>
+ <name>Qpid AMQP 0-10 to 1-0 Message Conversion Broker Plug-in</name>
+ <description>AMQP message conversion (0-10 to 1-0) broker plug-in</description>
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-common</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-amqp-0-10-protocol</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-common</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-amqp-1-0-protocol</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
</dependencies>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,43 +16,44 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
<parent>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-project</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <artifactId>qpid-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
- <modelVersion>4.0.0</modelVersion>
<artifactId>qpid-broker-plugins-amqp-msg-conv-0-8-to-0-10</artifactId>
+ <version>0.28-SNAPSHOT</version>
+ <name>Qpid AMQP 0-8 to 0-10 Message Conversion Broker Plug-in</name>
+ <description>AMQP message conversion (0-8, 0-9 and 0-9-1 to 0-10) broker plug-in</description>
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-common</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-amqp-0-10-protocol</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
</dependencies>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java Wed Feb 12 13:27:51 2014
@@ -61,7 +61,7 @@ public class MessageConverter_0_10_to_0_
{
if(deliveryProps.hasDeliveryMode())
{
- props.setDeliveryMode((byte) (deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT
+ props.setDeliveryMode((deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT
? BasicContentHeaderProperties.PERSISTENT
: BasicContentHeaderProperties.NON_PERSISTENT));
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,50 +16,50 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
<parent>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-project</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <artifactId>qpid-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
- <modelVersion>4.0.0</modelVersion>
<artifactId>qpid-broker-plugins-amqp-msg-conv-0-8-to-1-0</artifactId>
+ <version>0.28-SNAPSHOT</version>
+ <name>Qpid AMQP 0-8 to 1-0 Message Conversion Broker Plug-in</name>
+ <description>AMQP message conversion (0-8, 0-9 and 0-9-1 to 1-0) broker plug-in</description>
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-common</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-common</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-amqp-1-0-protocol</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
</dependencies>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,50 +16,49 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
<parent>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-project</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <artifactId>qpid-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
- <modelVersion>4.0.0</modelVersion>
<artifactId>qpid-broker-plugins-derby-store</artifactId>
+ <version>0.28-SNAPSHOT</version>
+ <name>Qpid Derby Message Store</name>
+ <description>Apache Derby DB message store broker plug-in</description>
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
- <version>0.26-SNAPSHOT</version>
- <scope>compile</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
- <version>10.8.2.2</version>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
- <version>${log4j-version}</version>
- <scope>compile</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-test-utils</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-provider-bone/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-provider-bone/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-provider-bone/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-provider-bone/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,29 +16,31 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
<parent>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-project</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <artifactId>qpid-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
- <modelVersion>4.0.0</modelVersion>
<artifactId>qpid-broker-plugins-jdbc-provider-bone</artifactId>
+ <version>0.28-SNAPSHOT</version>
+ <name>Qpid JDBC Message Store Connection Pooling Plug-in</name>
+ <description>JDBC Message Store Connection Pooling broker plug-in using BoneCP</description>
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.jolbox</groupId>
<artifactId>bonecp</artifactId>
- <version>0.7.1.RELEASE</version>
- <scope>compile</scope>
<exclusions>
<exclusion>
<!-- exclude and specify a fixed version below -->
@@ -51,7 +53,6 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>14.0.1</version>
<scope>runtime</scope>
</dependency>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/pom.xml Wed Feb 12 13:27:51 2014
@@ -16,43 +16,45 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
<parent>
<groupId>org.apache.qpid</groupId>
- <artifactId>qpid-project</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <artifactId>qpid-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
- <modelVersion>4.0.0</modelVersion>
<artifactId>qpid-broker-plugins-jdbc-store</artifactId>
+ <version>0.28-SNAPSHOT</version>
+ <name>Qpid JDBC Message Store Broker Plug-in</name>
+ <description>JDBC message store broker plug-in</description>
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
- <version>${log4j-version}</version>
- <scope>compile</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-test-utils</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
- <version>0.26-SNAPSHOT</version>
+ <version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
@@ -60,7 +62,6 @@
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
- <version>10.8.2.2</version>
<scope>test</scope>
</dependency>
</dependencies>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org