You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/03/11 00:11:10 UTC
svn commit: r752300 [8/12] - in /qpid/branches/qpid-1673/qpid: cpp/
cpp/examples/ cpp/examples/direct/ cpp/examples/failover/
cpp/examples/fanout/ cpp/examples/pub-sub/ cpp/examples/qmf-console/
cpp/examples/request-response/ cpp/examples/tradedemo/ cp...
Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java Tue Mar 10 23:10:57 2009
@@ -29,8 +29,11 @@
public class AMQQueueFactoryTest extends TestCase
{
+ final int MAX_SIZE = 50;
+
QueueRegistry _queueRegistry;
VirtualHost _virtualHost;
+ protected FieldTable _arguments;
public void setUp()
{
@@ -41,6 +44,15 @@
_queueRegistry = _virtualHost.getQueueRegistry();
assertEquals("Queues registered on an empty virtualhost", 0, _queueRegistry.getQueues().size());
+
+
+ _arguments = new FieldTable();
+
+ //Ensure we can call createQueue with a priority int value
+ _arguments.put(AMQQueueFactory.QPID_POLICY_TYPE, AMQQueueFactory.QPID_FLOW_TO_DISK);
+ // each message in the QBAAT is around 9-10 bytes each so only give space for half
+
+ _arguments.put(AMQQueueFactory.QPID_MAX_SIZE, MAX_SIZE);
}
public void tearDown()
@@ -50,17 +62,19 @@
}
- public void testPriorityQueueRegistration()
+ protected AMQQueue createQueue() throws AMQException
{
- FieldTable fieldTable = new FieldTable();
- fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5);
+ return AMQQueueFactory.createAMQQueueImpl(new AMQShortString(this.getName()), false, new AMQShortString("owner"), false,
+ _virtualHost, _arguments);
+ }
+
+ public void testQueueRegistration()
+ {
try
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false,
- _virtualHost, fieldTable);
-
- assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
+ AMQQueue queue = createQueue();
+ assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
}
catch (AMQException e)
{
@@ -68,18 +82,20 @@
}
}
-
- public void testSimpleQueueRegistration()
+ public void testQueueValuesAfterCreation()
{
try
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
- _virtualHost, null);
- assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
+ AMQQueue queue = createQueue();
+
+ assertEquals("MemoryMaximumSize not set correctly:", MAX_SIZE, queue.getMemoryUsageMaximum());
+ assertEquals("MemoryMinimumSize not defaulted to half maximum:", MAX_SIZE / 2, queue.getMemoryUsageMinimum());
+
}
catch (AMQException e)
{
fail(e.getMessage());
}
}
+
}
Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Tue Mar 10 23:10:57 2009
@@ -73,7 +73,7 @@
sendMessages(messageCount, false);
assertTrue(_queueMBean.getMessageCount() == messageCount);
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
- long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+ long queueDepth = (messageCount * MESSAGE_SIZE);
assertTrue(_queueMBean.getQueueDepth() == queueDepth);
_queueMBean.deleteMessageFromTop();
@@ -94,7 +94,7 @@
sendMessages(messageCount, true);
assertEquals("", messageCount, _queueMBean.getMessageCount().intValue());
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
- long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+ long queueDepth = (messageCount * MESSAGE_SIZE);
assertTrue(_queueMBean.getQueueDepth() == queueDepth);
_queueMBean.deleteMessageFromTop();
@@ -175,7 +175,7 @@
assertTrue(_queueMBean.getMaximumMessageCount() == 50000);
assertTrue(_queueMBean.getMaximumMessageSize() == 2000);
- assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 10));
+ assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth));
assertTrue(_queueMBean.getName().equals("testQueue"));
assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java Tue Mar 10 23:10:57 2009
@@ -22,6 +22,7 @@
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
public class MockAMQMessage extends TransientAMQMessage
{
@@ -29,6 +30,7 @@
throws AMQException
{
super(messageId);
+ _messagePublishInfo = new MessagePublishInfoImpl(null,false,false,null);
}
Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Tue Mar 10 23:10:57 2009
@@ -115,6 +115,11 @@
return false; //To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean isFlowed()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public int getMessageCount()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
@@ -216,6 +221,26 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public long getMemoryUsageMaximum()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setMemoryUsageMaximum(long maximumMemoryUsage)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getMemoryUsageMinimum()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setMemoryUsageMinimum(long minimumMemoryUsage)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public long getMaximumMessageSize()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
@@ -271,7 +296,6 @@
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- @Override
public void checkMessageStatus() throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -302,6 +326,11 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public long getMemoryUsageCurrent()
+ {
+ return 0;
+ }
+
public ManagedObject getManagedObject()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -312,7 +341,6 @@
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- @Override
public void setMinimumAlertRepeatGap(long value)
{
Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java Tue Mar 10 23:10:57 2009
@@ -21,16 +21,25 @@
package org.apache.qpid.server.queue;
import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
public class QueueEntryImplTest extends TestCase
{
- /**
- * Test the Redelivered state of a QueueEntryImpl
- */
+ /** Test the Redelivered state of a QueueEntryImpl */
public void testRedelivered()
{
- QueueEntry entry = new QueueEntryImpl(null, null);
+ QueueEntry entry = new MockQueueEntry(null);
assertFalse("New message should not be redelivered", entry.isRedelivered());
@@ -45,5 +54,187 @@
}
+ public void testImmediateAndNotDelivered()
+ {
+ AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+ QueueEntry queueEntry = new MockQueueEntry(message);
+
+ assertTrue("Undelivered Immediate message should still be marked as so", queueEntry.immediateAndNotDelivered());
+
+ assertFalse("Undelivered Message should not say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+ queueEntry.setDeliveredToConsumer();
+
+ assertTrue("Delivered Message should say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+ assertFalse("Delivered Immediate message now be marked as so", queueEntry.immediateAndNotDelivered());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testNotImmediateAndNotDelivered()
+ {
+ AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+ QueueEntry queueEntry = new MockQueueEntry(message);
+
+ assertFalse("Undelivered Non-Immediate message should not result in true.", queueEntry.immediateAndNotDelivered());
+
+ assertFalse("Undelivered Message should not say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+ queueEntry.setDeliveredToConsumer();
+
+ assertTrue("Delivered Message should say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+ assertFalse("Delivered Non-Immediate message not change this return", queueEntry.immediateAndNotDelivered());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testExpiry()
+ {
+ AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ try
+ {
+ message.setExpiration(System.currentTimeMillis() + 10L);
+
+ message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+ QueueEntry queueEntry = new MockQueueEntry(message);
+
+ assertFalse("New messages should not be expired.", queueEntry.expired());
+
+ final long MILLIS = 1000000L;
+ long waitTime = 20 * MILLIS;
+
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ assertTrue("After a sleep messages should now be expired.", queueEntry.expired());
+
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testNoExpiry()
+ {
+ AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ try
+ {
+
+ message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+ QueueEntry queueEntry = new MockQueueEntry(message);
+
+ assertFalse("New messages should not be expired.", queueEntry.expired());
+
+ final long MILLIS = 1000000L;
+ long waitTime = 10 * MILLIS;
+
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ assertFalse("After a sleep messages without an expiry should not expire.", queueEntry.expired());
+
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
}
Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Tue Mar 10 23:10:57 2009
@@ -21,8 +21,6 @@
*/
import junit.framework.TestCase;
-
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -31,17 +29,18 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.ArrayList;
import java.util.List;
@@ -51,7 +50,7 @@
protected SimpleAMQQueue _queue;
protected VirtualHost _virtualHost;
- protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore();
+ protected TestableMemoryMessageStore _transactionLog = new TestableMemoryMessageStore();
protected AMQShortString _qname = new AMQShortString("qname");
protected AMQShortString _owner = new AMQShortString("owner");
protected AMQShortString _routingKey = new AMQShortString("routing key");
@@ -60,7 +59,7 @@
protected FieldTable _arguments = null;
MessagePublishInfo info = new MessagePublishInfoImpl();
- private static final long MESSAGE_SIZE = 100;
+ protected static long MESSAGE_SIZE = 100;
@Override
protected void setUp() throws Exception
@@ -70,7 +69,7 @@
ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _store);
+ _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getSimpleName(), env), _transactionLog);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
@@ -320,8 +319,8 @@
public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
{
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
- IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_store), _store);
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.properties = new BasicContentHeaderProperties();
@@ -335,18 +334,18 @@
// Send persistent message
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_store);
+ msg.routingComplete(_transactionLog);
- _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
+ _transactionLog.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
// Check that it is enqueued
- List<AMQQueue> data = _store.getMessageReferenceMap(messageId);
+ List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
assertNotNull(data);
// Dequeue message
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
- AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
+ AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _transactionLog);
message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
MockQueueEntry entry = new MockQueueEntry(message, _queue);
@@ -355,10 +354,164 @@
entry.dequeue(null);
// Check that it is dequeued
- data = _store.getMessageReferenceMap(messageId);
+ data = _transactionLog.getMessageReferenceMap(messageId);
assertNull(data);
}
+ public void testMessagesFlowToDisk() throws AMQException, InterruptedException
+ {
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+ MESSAGE_SIZE = 1;
+ long MEMORY_MAX = 500;
+ int MESSAGE_COUNT = (int) MEMORY_MAX * 2;
+ //Set the Memory Usage to be very low
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
+
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT / 2; msgCount++)
+ {
+ sendMessage(txnContext);
+ }
+
+ //Check that we can hold 10 messages without flowing
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+ // Send anothe and ensure we are flowed
+ sendMessage(txnContext);
+ assertEquals(MESSAGE_COUNT / 2 + 1, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT / 2, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ //send another 99 so there are 200msgs in total on the queue
+ for (int msgCount = 0; msgCount < (MESSAGE_COUNT / 2) - 1; msgCount++)
+ {
+ sendMessage(txnContext);
+
+ long usage = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + usage,
+ usage <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + usage, usage > 0);
+
+ }
+ assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ _queue.registerSubscription(_subscription, false);
+
+ int slept = 0;
+ while (_subscription.getQueueEntries().size() != MESSAGE_COUNT && slept < 10)
+ {
+ Thread.sleep(500);
+ slept++;
+ }
+
+ //Ensure the messages are retreived
+ assertEquals("Not all messages were received, slept:" + slept / 2 + "s", MESSAGE_COUNT, _subscription.getQueueEntries().size());
+
+ //Check the queue is still within it's limits.
+ long current = _queue.getMemoryUsageCurrent();
+ assertTrue("Queue has gone over quota:" + current + "/" + _queue.getMemoryUsageMaximum(),
+ current <= _queue.getMemoryUsageMaximum());
+
+ assertTrue("Queue has a negative quota:" + _queue.getMemoryUsageCurrent(), _queue.getMemoryUsageCurrent() >= 0);
+
+ for (int index = 0; index < MESSAGE_COUNT; index++)
+ {
+ // Ensure that we have received the messages and it wasn't flushed to disk before we received it.
+ AMQMessage message = _subscription.getMessages().get(index);
+ assertNotNull("Message:" + message.debugIdentity() + " was null.", message);
+ }
+ }
+
+ public void testMessagesFlowToDiskPurger() throws AMQException, InterruptedException
+ {
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+ MESSAGE_SIZE = 1;
+ /** Set to larger than the purge batch size. Default 100.
+ * @see FlowableBaseQueueEntryList.BATCH_PROCESS_COUNT */
+ long MEMORY_MAX = 500;
+ int MESSAGE_COUNT = (int) MEMORY_MAX;
+ //Set the Memory Usage to be very low
+ _queue.setMemoryUsageMaximum(MEMORY_MAX);
+
+ for (int msgCount = 0; msgCount < MESSAGE_COUNT; msgCount++)
+ {
+ sendMessage(txnContext);
+ }
+
+ //Check that we can hold all messages without flowing
+ assertEquals(MESSAGE_COUNT, _queue.getMessageCount());
+ assertEquals(MEMORY_MAX, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+ // Send anothe and ensure we are flowed
+ sendMessage(txnContext);
+ assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+ assertEquals(MESSAGE_COUNT, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ _queue.setMemoryUsageMaximum(0L);
+
+ //Give the purger time to work maximum of 1s
+ int slept = 0;
+ while (_queue.getMemoryUsageCurrent() > 0 && slept < 5)
+ {
+ Thread.yield();
+ Thread.sleep(200);
+ slept++;
+ }
+
+ assertEquals(MESSAGE_COUNT + 1, _queue.getMessageCount());
+ assertEquals(0L, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ }
+
+ protected void sendMessage(TransactionalContext txnContext) throws AMQException
+ {
+ sendMessage(txnContext, 5);
+ }
+
+ protected void sendMessage(TransactionalContext txnContext, int priority) throws AMQException
+ {
+ IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
+
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+ contentHeaderBody.bodySize = MESSAGE_SIZE;
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setPriority((byte) priority);
+ msg.setContentHeaderBody(contentHeaderBody);
+
+ long messageId = msg.getMessageId();
+
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+
+ // Send persistent 10 messages
+
+ qs.add(_queue);
+ msg.enqueue(qs);
+
+ msg.routingComplete(_transactionLog);
+
+ msg.addContentBodyFrame(new MockContentChunk(1));
+
+ msg.deliverToQueues();
+
+ //Check message was correctly enqueued
+ List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(data);
+ }
+
+
// FIXME: move this to somewhere useful
private static AMQMessage createMessage(final MessagePublishInfo publishBody)
{
@@ -384,7 +537,7 @@
public AMQMessage createMessage() throws AMQException
{
- AMQMessage message = new TestMessage(info, _store);
+ AMQMessage message = new TestMessage(info, _transactionLog);
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
@@ -410,7 +563,6 @@
_transactionLog = transactionLog;
}
-
void assertCountEquals(int expected)
{
assertEquals("Wrong count for message with tag " + _tag, expected,
Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java Tue Mar 10 23:10:57 2009
@@ -287,180 +287,5 @@
assertFalse(_message.isPersistent());
}
- public void testImmediateAndNotDelivered()
- {
- _message = newMessage();
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null);
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- try
- {
- _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
- assertTrue("Undelivered Immediate message should still be marked as so", _message.immediateAndNotDelivered());
-
- assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer());
-
- _message.setDeliveredToConsumer();
-
- assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer());
-
- assertFalse("Delivered Immediate message now be marked as so", _message.immediateAndNotDelivered());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
-
- public void testNotImmediateAndNotDelivered()
- {
- _message = newMessage();
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- try
- {
- _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
- assertFalse("Undelivered Non-Immediate message should not result in true.", _message.immediateAndNotDelivered());
-
- assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer());
-
- _message.setDeliveredToConsumer();
-
- assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer());
-
- assertFalse("Delivered Non-Immediate message not change this return", _message.immediateAndNotDelivered());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
-
- public void testExpiry()
- {
- _message = newMessage();
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- ReentrantLock waitLock = new ReentrantLock();
- Condition wait = waitLock.newCondition();
- try
- {
- _message.setExpiration(System.currentTimeMillis() + 10L);
-
- _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
- assertFalse("New messages should not be expired.", _message.expired());
-
- final long MILLIS =1000000L;
- long waitTime = 20 * MILLIS;
-
- while (waitTime > 0)
- {
- try
- {
- waitLock.lock();
-
- waitTime = wait.awaitNanos(waitTime);
- }
- catch (InterruptedException e)
- {
- //Stop if we are interrupted
- fail(e.getMessage());
- }
- finally
- {
- waitLock.unlock();
- }
-
- }
-
- assertTrue("After a sleep messages should now be expired.", _message.expired());
-
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
-
-
- public void testNoExpiry()
- {
- _message = newMessage();
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- ReentrantLock waitLock = new ReentrantLock();
- Condition wait = waitLock.newCondition();
- try
- {
-
- _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
- assertFalse("New messages should not be expired.", _message.expired());
-
- final long MILLIS =1000000L;
- long waitTime = 10 * MILLIS;
-
- while (waitTime > 0)
- {
- try
- {
- waitLock.lock();
-
- waitTime = wait.awaitNanos(waitTime);
- }
- catch (InterruptedException e)
- {
- //Stop if we are interrupted
- fail(e.getMessage());
- }
- finally
- {
- waitLock.unlock();
- }
-
- }
-
- assertFalse("After a sleep messages without an expiry should not expire.", _message.expired());
-
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
-
+
}
Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java Tue Mar 10 23:10:57 2009
@@ -27,6 +27,7 @@
import junit.framework.TestCase;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.configuration.SecurityConfiguration;
@@ -79,7 +80,7 @@
assertTrue(_authzManager.authorisePurge(_session, queue));
}
- public void testACLManagerConfigurationPluginManagerACLPlugin()
+ public void testACLManagerConfigurationPluginManagerACLPlugin() throws ConfigurationException
{
_authzManager = new ACLManager(_conf, _pluginManager, ExchangeDenier.FACTORY);
@@ -87,7 +88,7 @@
assertFalse(_authzManager.authoriseDelete(_session, exchange));
}
- public void testConfigurePlugins()
+ public void testConfigurePlugins() throws ConfigurationException
{
Configuration hostConfig = new PropertiesConfiguration();
hostConfig.setProperty("queueDenier", "thisoneneither");
Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBeanTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBeanTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBeanTest.java Tue Mar 10 23:10:57 2009
@@ -21,103 +21,213 @@
package org.apache.qpid.server.security.access.management;
+import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
-import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.management.MBeanInvocationHandlerImpl;
+import org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase;
import junit.framework.TestCase;
+/* Note: The main purpose is to test the jmx access rights file manipulation
+ * within AMQUserManagementMBean. The Principal Databases are tested by their own tests,
+ * this test just exercises their usage in AMQUserManagementMBean.
+ */
public class AMQUserManagementMBeanTest extends TestCase
{
- private Base64MD5PasswordFilePrincipalDatabase _database;
+ private PlainPasswordFilePrincipalDatabase _database;
private AMQUserManagementMBean _amqumMBean;
+
+ private File _passwordFile;
+ private File _accessFile;
- private static final String _QPID_HOME = System.getProperty("QPID_HOME");
-
- private static final String USERNAME = "testuser";
- private static final String PASSWORD = "password";
- private static final String JMXRIGHTS = "admin";
- private static final String TEMP_PASSWORD_FILE_NAME = "tempPasswordFile.tmp";
- private static final String TEMP_JMXACCESS_FILE_NAME = "tempJMXAccessFile.tmp";
+ private static final String TEST_USERNAME = "testuser";
+ private static final String TEST_PASSWORD = "password";
@Override
protected void setUp() throws Exception
{
- assertNotNull("QPID_HOME not set", _QPID_HOME);
-
- _database = new Base64MD5PasswordFilePrincipalDatabase();
+ _database = new PlainPasswordFilePrincipalDatabase();
_amqumMBean = new AMQUserManagementMBean();
+ loadFreshTestPasswordFile();
+ loadFreshTestAccessFile();
}
@Override
protected void tearDown() throws Exception
{
- File testFile = new File(_QPID_HOME + File.separator + TEMP_JMXACCESS_FILE_NAME + ".tmp");
- if (testFile.exists())
+ _passwordFile.delete();
+ _accessFile.delete();
+ }
+
+ public void testDeleteUser()
+ {
+ loadFreshTestPasswordFile();
+ loadFreshTestAccessFile();
+
+ //try deleting a non existant user
+ assertFalse(_amqumMBean.deleteUser("made.up.username"));
+
+ assertTrue(_amqumMBean.deleteUser(TEST_USERNAME));
+ }
+
+ public void testDeleteUserIsSavedToAccessFile()
+ {
+ loadFreshTestPasswordFile();
+ loadFreshTestAccessFile();
+
+ assertTrue(_amqumMBean.deleteUser(TEST_USERNAME));
+
+ //check the access rights were actually deleted from the file
+ try{
+ BufferedReader reader = new BufferedReader(new FileReader(_accessFile));
+
+ //check the 'generated by' comment line is present
+ assertTrue("File has no content", reader.ready());
+ assertTrue("'Generated by' comment line was missing",reader.readLine().contains("Generated by " +
+ "AMQUserManagementMBean Console : Last edited by user:"));
+
+ //there should also be a modified date/time comment line
+ assertTrue("File has no modified date/time comment line", reader.ready());
+ assertTrue("Modification date/time comment line was missing",reader.readLine().startsWith("#"));
+
+ //the access file should not contain any further data now as we just deleted the only user
+ assertFalse("User access data was present when it should have been deleted", reader.ready());
+ }
+ catch (IOException e)
{
- testFile.delete();
+ fail("Unable to valdate file contents due to:" + e.getMessage());
}
+
+ }
- testFile = new File(_QPID_HOME + File.separator + TEMP_JMXACCESS_FILE_NAME + ".old");
- if (testFile.exists())
+ public void testSetRights()
+ {
+ loadFreshTestPasswordFile();
+ loadFreshTestAccessFile();
+
+ assertFalse(_amqumMBean.setRights("made.up.username", true, false, false));
+
+ assertTrue(_amqumMBean.setRights(TEST_USERNAME, true, false, false));
+ assertTrue(_amqumMBean.setRights(TEST_USERNAME, false, true, false));
+ assertTrue(_amqumMBean.setRights(TEST_USERNAME, false, false, true));
+ }
+
+ public void testSetRightsIsSavedToAccessFile()
+ {
+ loadFreshTestPasswordFile();
+ loadFreshTestAccessFile();
+
+ assertTrue(_amqumMBean.setRights(TEST_USERNAME, false, false, true));
+
+ //check the access rights were actually updated in the file
+ try{
+ BufferedReader reader = new BufferedReader(new FileReader(_accessFile));
+
+ //check the 'generated by' comment line is present
+ assertTrue("File has no content", reader.ready());
+ assertTrue("'Generated by' comment line was missing",reader.readLine().contains("Generated by " +
+ "AMQUserManagementMBean Console : Last edited by user:"));
+
+ //there should also be a modified date/time comment line
+ assertTrue("File has no modified date/time comment line", reader.ready());
+ assertTrue("Modification date/time comment line was missing",reader.readLine().startsWith("#"));
+
+ //the access file should not contain any further data now as we just deleted the only user
+ assertTrue("User access data was not updated in the access file",
+ reader.readLine().equals(TEST_USERNAME + "=" + MBeanInvocationHandlerImpl.ADMIN));
+
+ //the access file should not contain any further data now as we just deleted the only user
+ assertFalse("Additional user access data was present when there should be no more", reader.ready());
+ }
+ catch (IOException e)
{
- testFile.delete();
+ fail("Unable to valdate file contents due to:" + e.getMessage());
}
+ }
- testFile = new File(_QPID_HOME + File.separator + TEMP_PASSWORD_FILE_NAME + ".tmp");
- if (testFile.exists())
+ public void testMBeanVersion()
+ {
+ try
{
- testFile.delete();
+ ObjectName name = _amqumMBean.getObjectName();
+ assertEquals(AMQUserManagementMBean.VERSION, Integer.parseInt(name.getKeyProperty("version")));
}
-
- testFile = new File(_QPID_HOME + File.separator + TEMP_PASSWORD_FILE_NAME + ".old");
- if (testFile.exists())
+ catch (MalformedObjectNameException e)
{
- testFile.delete();
+ fail(e.getMessage());
}
}
- public void testDeleteUser()
+ public void testSetAccessFileWithMissingFile()
{
- loadTestPasswordFile();
- loadTestAccessFile();
-
- boolean deleted = false;
+ try
+ {
+ _amqumMBean.setAccessFile("made.up.filename");
+ }
+ catch (IOException e)
+ {
+ fail("Should not have been an IOE." + e.getMessage());
+ }
+ catch (ConfigurationException e)
+ {
+ assertTrue(e.getMessage(), e.getMessage().endsWith("does not exist"));
+ }
+ }
+ public void testSetAccessFileWithReadOnlyFile()
+ {
+ File testFile = null;
try
{
- deleted = _amqumMBean.deleteUser(USERNAME);
+ testFile = File.createTempFile(this.getClass().getName(),".access.readonly");
+ BufferedWriter passwordWriter = new BufferedWriter(new FileWriter(testFile, false));
+ passwordWriter.write(TEST_USERNAME + ":" + TEST_PASSWORD);
+ passwordWriter.newLine();
+ passwordWriter.flush();
+ passwordWriter.close();
+
+ testFile.setReadOnly();
+ _amqumMBean.setAccessFile(testFile.getPath());
}
- catch(Exception e){
- fail("Unable to delete user: " + e.getMessage());
+ catch (IOException e)
+ {
+ fail("Access file was not created." + e.getMessage());
+ }
+ catch (ConfigurationException e)
+ {
+ fail("There should not have been a configuration exception." + e.getMessage());
}
- assertTrue(deleted);
+ testFile.delete();
}
-
-
+
// ============================ Utility methods =========================
- private void loadTestPasswordFile()
+ private void loadFreshTestPasswordFile()
{
try
{
- File tempPasswordFile = new File(_QPID_HOME + File.separator + TEMP_PASSWORD_FILE_NAME);
- if (tempPasswordFile.exists())
+ if(_passwordFile == null)
{
- tempPasswordFile.delete();
+ _passwordFile = File.createTempFile(this.getClass().getName(),".password");
}
- tempPasswordFile.deleteOnExit();
- BufferedWriter passwordWriter = new BufferedWriter(new FileWriter(tempPasswordFile));
- passwordWriter.write(USERNAME + ":" + PASSWORD);
+ BufferedWriter passwordWriter = new BufferedWriter(new FileWriter(_passwordFile, false));
+ passwordWriter.write(TEST_USERNAME + ":" + TEST_PASSWORD);
passwordWriter.newLine();
passwordWriter.flush();
-
- _database.setPasswordFile(tempPasswordFile.toString());
+ passwordWriter.close();
+ _database.setPasswordFile(_passwordFile.toString());
_amqumMBean.setPrincipalDatabase(_database);
}
catch (IOException e)
@@ -126,27 +236,36 @@
}
}
- private void loadTestAccessFile()
+ private void loadFreshTestAccessFile()
{
try
{
- File tempAccessFile = new File(_QPID_HOME + File.separator + TEMP_JMXACCESS_FILE_NAME);
- if (tempAccessFile.exists())
+ if(_accessFile == null)
{
- tempAccessFile.delete();
+ _accessFile = File.createTempFile(this.getClass().getName(),".access");
}
- tempAccessFile.deleteOnExit();
-
- BufferedWriter accessWriter = new BufferedWriter(new FileWriter(tempAccessFile));
- accessWriter.write(USERNAME + "=" + JMXRIGHTS);
+
+ BufferedWriter accessWriter = new BufferedWriter(new FileWriter(_accessFile,false));
+ accessWriter.write("#Last Updated By comment");
+ accessWriter.newLine();
+ accessWriter.write("#Date/time comment");
+ accessWriter.newLine();
+ accessWriter.write(TEST_USERNAME + "=" + MBeanInvocationHandlerImpl.READONLY);
accessWriter.newLine();
accessWriter.flush();
+ accessWriter.close();
+ }
+ catch (IOException e)
+ {
+ fail("Unable to create test access file: " + e.getMessage());
+ }
- _amqumMBean.setAccessFile(tempAccessFile.toString());
+ try{
+ _amqumMBean.setAccessFile(_accessFile.toString());
}
catch (Exception e)
{
- fail("Unable to create test access file: " + e.getMessage());
+ fail("Unable to set access file: " + e.getMessage());
}
}
}
Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabaseTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabaseTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabaseTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabaseTest.java Tue Mar 10 23:10:57 2009
@@ -22,8 +22,10 @@
import junit.framework.TestCase;
+import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.login.AccountNotFoundException;
+import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import java.io.BufferedReader;
@@ -33,7 +35,9 @@
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.security.Principal;
+import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
@@ -41,12 +45,38 @@
{
private static final String TEST_COMMENT = "# Test Comment";
- private String USERNAME = "testUser";
- private String _username = this.getClass().getName()+"username";
- private char[] _password = "password".toCharArray();
- private Principal _principal = new UsernamePrincipal(_username);
+
+ private static final String USERNAME = "testUser";
+ private static final String PASSWORD = "guest";
+ private static final String PASSWORD_B64MD5HASHED = "CE4DQ6BIb/BVMN9scFyLtA==";
+ private static char[] PASSWORD_MD5_CHARS;
+ private static final String PRINCIPAL_USERNAME = "testUserPrincipal";
+ private static final Principal PRINCIPAL = new UsernamePrincipal(PRINCIPAL_USERNAME);
private Base64MD5PasswordFilePrincipalDatabase _database;
private File _pwdFile;
+
+ static
+ {
+ try
+ {
+ Base64 b64 = new Base64();
+ byte[] md5passBytes = PASSWORD_B64MD5HASHED.getBytes(Base64MD5PasswordFilePrincipalDatabase.DEFAULT_ENCODING);
+ byte[] decoded = b64.decode(md5passBytes);
+
+ PASSWORD_MD5_CHARS = new char[decoded.length];
+
+ int index = 0;
+ for (byte c : decoded)
+ {
+ PASSWORD_MD5_CHARS[index++] = (char) c;
+ }
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ fail("Unable to perform B64 decode to get the md5 char[] password");
+ }
+ }
+
public void setUp() throws Exception
{
@@ -111,7 +141,56 @@
loadPasswordFile(testFile);
- final String CREATED_PASSWORD = "createdPassword";
+
+ Principal principal = new Principal()
+ {
+ public String getName()
+ {
+ return USERNAME;
+ }
+ };
+
+ assertTrue("New user not created.", _database.createPrincipal(principal, PASSWORD.toCharArray()));
+
+ PasswordCallback callback = new PasswordCallback("prompt",false);
+ try
+ {
+ _database.setPassword(principal, callback);
+ }
+ catch (AccountNotFoundException e)
+ {
+ fail("user account did not exist");
+ }
+ assertTrue("Password returned was incorrect.", Arrays.equals(PASSWORD_MD5_CHARS, callback.getPassword()));
+
+ loadPasswordFile(testFile);
+
+ try
+ {
+ _database.setPassword(principal, callback);
+ }
+ catch (AccountNotFoundException e)
+ {
+ fail("user account did not exist");
+ }
+ assertTrue("Password returned was incorrect.", Arrays.equals(PASSWORD_MD5_CHARS, callback.getPassword()));
+
+ assertNotNull("Created User was not saved", _database.getUser(USERNAME));
+
+ assertFalse("Duplicate user created.", _database.createPrincipal(principal, PASSWORD.toCharArray()));
+
+ testFile.delete();
+ }
+
+ public void testCreatePrincipalIsSavedToFile()
+ {
+
+ File testFile = createPasswordFile(1, 0);
+
+ loadPasswordFile(testFile);
+
+ final String CREATED_PASSWORD = "guest";
+ final String CREATED_B64MD5HASHED_PASSWORD = "CE4DQ6BIb/BVMN9scFyLtA==";
final String CREATED_USERNAME = "createdUser";
Principal principal = new Principal()
@@ -122,16 +201,37 @@
}
};
- assertTrue("New user not created.", _database.createPrincipal(principal, CREATED_PASSWORD.toCharArray()));
+ _database.createPrincipal(principal, CREATED_PASSWORD.toCharArray());
- loadPasswordFile(testFile);
+ try
+ {
+ BufferedReader reader = new BufferedReader(new FileReader(testFile));
+
+ assertTrue("File has no content", reader.ready());
+
+ assertEquals("Comment line has been corrupted.", TEST_COMMENT, reader.readLine());
- assertNotNull("Created User was not saved", _database.getUser(CREATED_USERNAME));
+ assertTrue("File is missing user data.", reader.ready());
- assertFalse("Duplicate user created.", _database.createPrincipal(principal, CREATED_PASSWORD.toCharArray()));
+ String userLine = reader.readLine();
+
+ String[] result = Pattern.compile(":").split(userLine);
+ assertEquals("User line not complete '" + userLine + "'", 2, result.length);
+
+ assertEquals("Username not correct,", CREATED_USERNAME, result[0]);
+ assertEquals("Password not correct,", CREATED_B64MD5HASHED_PASSWORD, result[1]);
+
+ assertFalse("File has more content", reader.ready());
+
+ }
+ catch (IOException e)
+ {
+ fail("Unable to valdate file contents due to:" + e.getMessage());
+ }
testFile.delete();
}
+
public void testDeletePrincipal()
{
@@ -228,8 +328,8 @@
assertNotNull(testUser);
- String NEW_PASSWORD = "NewPassword";
- String NEW_PASSWORD_HASH = "TmV3UGFzc3dvcmQ=";
+ String NEW_PASSWORD = "guest";
+ String NEW_PASSWORD_HASH = "CE4DQ6BIb/BVMN9scFyLtA==";
try
{
_database.updatePassword(testUser, NEW_PASSWORD.toCharArray());
@@ -268,7 +368,7 @@
testFile.delete();
}
- public void testSetPasswordWithMissingFile()
+ public void testSetPasswordFileWithMissingFile()
{
try
{
@@ -285,7 +385,7 @@
}
- public void testSetPasswordWithReadOnlyFile()
+ public void testSetPasswordFileWithReadOnlyFile()
{
File testFile = createPasswordFile(0, 0);
@@ -310,28 +410,38 @@
public void testCreateUserPrincipal() throws IOException
{
- _database.createPrincipal(_principal, _password);
- Principal newPrincipal = _database.getUser(_username);
+ _database.createPrincipal(PRINCIPAL, PASSWORD.toCharArray());
+ Principal newPrincipal = _database.getUser(PRINCIPAL_USERNAME);
assertNotNull(newPrincipal);
- assertEquals(_principal.getName(), newPrincipal.getName());
+ assertEquals(PRINCIPAL.getName(), newPrincipal.getName());
}
public void testVerifyPassword() throws IOException, AccountNotFoundException
{
testCreateUserPrincipal();
//assertFalse(_pwdDB.verifyPassword(_username, null));
- assertFalse(_database.verifyPassword(_username, new char[]{}));
- assertFalse(_database.verifyPassword(_username, "massword".toCharArray()));
- assertTrue(_database.verifyPassword(_username, _password));
+ assertFalse(_database.verifyPassword(PRINCIPAL_USERNAME, new char[]{}));
+ assertFalse(_database.verifyPassword(PRINCIPAL_USERNAME, (PASSWORD+"z").toCharArray()));
+ assertTrue(_database.verifyPassword(PRINCIPAL_USERNAME, PASSWORD.toCharArray()));
+
+ try
+ {
+ _database.verifyPassword("made.up.username", PASSWORD.toCharArray());
+ fail("Should not have been able to verify this non-existant users password.");
+ }
+ catch (AccountNotFoundException e)
+ {
+ // pass
+ }
}
public void testUpdatePassword() throws IOException, AccountNotFoundException
{
testCreateUserPrincipal();
char[] newPwd = "newpassword".toCharArray();
- _database.updatePassword(_principal, newPwd);
- assertFalse(_database.verifyPassword(_username, _password));
- assertTrue(_database.verifyPassword(_username, newPwd));
+ _database.updatePassword(PRINCIPAL, newPwd);
+ assertFalse(_database.verifyPassword(PRINCIPAL_USERNAME, PASSWORD.toCharArray()));
+ assertTrue(_database.verifyPassword(PRINCIPAL_USERNAME, newPwd));
}
-
+
}
Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/HashedUserTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/HashedUserTest.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/HashedUserTest.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/database/HashedUserTest.java Tue Mar 10 23:10:57 2009
@@ -34,7 +34,7 @@
String USERNAME = "username";
String PASSWORD = "password";
- String HASHED_PASSWORD = "cGFzc3dvcmQ=";
+ String B64_ENCODED_PASSWORD = "cGFzc3dvcmQ=";
public void testToLongArrayConstructor()
{
@@ -57,11 +57,11 @@
{
try
{
- HashedUser user = new HashedUser(new String[]{USERNAME, HASHED_PASSWORD});
+ HashedUser user = new HashedUser(new String[]{USERNAME, B64_ENCODED_PASSWORD});
assertEquals("Username incorrect", USERNAME, user.getName());
int index = 0;
- char[] hash = HASHED_PASSWORD.toCharArray();
+ char[] hash = B64_ENCODED_PASSWORD.toCharArray();
try
{
Modified: qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/qpid-1673/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Tue Mar 10 23:10:57 2009
@@ -30,10 +30,13 @@
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
+import org.apache.log4j.Logger;
public class MockSubscription implements Subscription
{
+ private static final Logger _logger = Logger.getLogger(MockSubscription.class);
private boolean _closed = false;
private AMQShortString tag = new AMQShortString("mocktag");
@@ -41,8 +44,12 @@
private StateListener _listener = null;
private QueueEntry lastSeen = null;
private State _state = State.ACTIVE;
- private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
+ private ArrayList<QueueEntry> _queueEntries = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
+ private ArrayList<AMQMessage> _messages = new ArrayList<AMQMessage>();
+
+
+
public void close()
{
@@ -136,10 +143,14 @@
{
}
- public void send(QueueEntry msg) throws AMQException
+ public void send(QueueEntry entry) throws AMQException
{
- lastSeen = msg;
- messages.add(msg);
+ _logger.info("Sending Message(" + entry.debugIdentity() + ") to subscription:" + this);
+
+ lastSeen = entry;
+ _queueEntries.add(entry);
+ _messages.add(entry.getMessage());
+ entry.setDeliveredToSubscription();
}
public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
@@ -173,8 +184,14 @@
return false;
}
- public ArrayList<QueueEntry> getMessages()
+ public ArrayList<QueueEntry> getQueueEntries()
{
- return messages;
+ return _queueEntries;
}
+
+ public ArrayList<AMQMessage> getMessages()
+ {
+ return _messages;
+ }
+
}
Modified: qpid/branches/qpid-1673/qpid/java/build.deps
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/build.deps?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/build.deps (original)
+++ qpid/branches/qpid-1673/qpid/java/build.deps Tue Mar 10 23:10:57 2009
@@ -92,7 +92,6 @@
client-example.libs=${client.libs}
testkit.libs=${client.libs}
-
ibm-icu=lib/com.ibm.icu_3.8.1.v20080530.jar
ecl-core-jface=lib/org.eclipse.jface_3.4.1.M20080827-2000.jar
ecl-core-jface-databinding=lib/org.eclipse.jface.databinding_1.2.1.M20080827-0800a.jar
@@ -114,30 +113,41 @@
ecl-ui=lib/org.eclipse.ui_3.4.1.M20080910-0800.jar
ecl-ui-forms=lib/org.eclipse.ui.forms_3.3.101.v20080708_34x.jar
ecl-ui-workbench=lib/org.eclipse.ui.workbench_3.4.1.M20080827-0800a.jar
+apache-commons-codec=lib/org.apache.commons.codec_1.3.0.v20080530-1600.jar
ecl-swt-win32-win32-x86=lib/org.eclipse.swt.win32.win32.x86_3.4.1.v3449c.jar
ecl-equinox-launcher-win32-win32-x86=lib/org.eclipse.equinox.launcher.win32.win32.x86_1.0.101.R34x_v20080731/**
ecl-swt-linux-gtk-x86=lib/org.eclipse.swt.gtk.linux.x86_3.4.1.v3449c.jar
ecl-equinox-launcher-linux-gtk-x86=lib/org.eclipse.equinox.launcher.gtk.linux.x86_1.0.101.R34x_v20080805/**
+ecl-swt-linux-gtk-x86_64=lib/org.eclipse.swt.gtk.linux.x86_64_3.4.1.v3449c.jar
+ecl-equinox-launcher-linux-gtk-x86_64=lib/org.eclipse.equinox.launcher.gtk.linux.x86_64_1.0.101.R34x_v20080731/**
ecl-swt-macosx-carbon=lib/org.eclipse.swt.carbon.macosx_3.4.1.v3449c.jar
ecl-equinox-launcher-macosx-carbon=lib/org.eclipse.equinox.launcher.carbon.macosx_1.0.101.R34x_v20080731/**
+ecl-swt-solaris-gtk-sparc=lib/org.eclipse.swt.gtk.solaris.sparc_3.4.1.v3449c.jar
+ecl-equinox-launcher-solaris-gtk-sparc=lib/org.eclipse.equinox.launcher.gtk.solaris.sparc_1.0.101.R34x_v20080731/**
management-eclipse-plugin-win32-win32-x86.libs=${management-eclipse-plugin.core-libs} \
${ecl-swt-win32-win32-x86} ${ecl-equinox-launcher-win32-win32-x86}
management-eclipse-plugin-linux-gtk-x86.libs=${management-eclipse-plugin.core-libs} \
${ecl-swt-linux-gtk-x86} ${ecl-equinox-launcher-linux-gtk-x86}
+management-eclipse-plugin-linux-gtk-x86_64.libs=${management-eclipse-plugin.core-libs} \
+ ${ecl-swt-linux-gtk-x86_64} ${ecl-equinox-launcher-linux-gtk-x86_64}
management-eclipse-plugin-macosx.libs=${management-eclipse-plugin.core-libs} \
${ecl-swt-macosx-carbon} ${ecl-equinox-launcher-macosx-carbon}
+management-eclipse-plugin-solaris-gtk-sparc.libs=${management-eclipse-plugin.core-libs} \
+ ${ecl-swt-solaris-gtk-sparc} ${ecl-equinox-launcher-solaris-gtk-sparc}
management-eclipse-plugin.core-libs=${ibm-icu} ${ecl-core-jface} ${ecl-core-jface-databinding} \
${ecl-core-commands} ${ecl-core-contenttype} ${ecl-core-databinding} ${ecl-core-expressions} \
${ecl-core-jobs} ${ecl-core-runtime} ${ecl-core-runtime-compat-registry} ${ecl-equinox-app} \
${ecl-equinox-common} ${ecl-equinox-launcher} ${ecl-equinox-prefs} ${ecl-equinox-registry} \
- ${ecl-help} ${ecl-osgi} ${ecl-swt} ${ecl-ui} ${ecl-ui-forms} ${ecl-ui-workbench}
+ ${ecl-help} ${ecl-osgi} ${ecl-swt} ${ecl-ui} ${ecl-ui-forms} ${ecl-ui-workbench} ${apache-commons-codec}
management-eclipse-plugin.platform-libs=${ecl-equinox-launcher-win32-win32-x86} \
${ecl-equinox-launcher-linux-gtk-x86} ${ecl-equinox-launcher-macosx-carbon} \
- ${ecl-swt-win32-win32-x86} ${ecl-swt-linux-gtk-x86} ${ecl-swt-macosx-carbon}
+ ${ecl-swt-win32-win32-x86} ${ecl-swt-linux-gtk-x86} ${ecl-swt-macosx-carbon} \
+ ${ecl-swt-linux-gtk-x86_64} ${ecl-equinox-launcher-linux-gtk-x86_64} \
+ ${ecl-swt-solaris-gtk-sparc} ${ecl-equinox-launcher-solaris-gtk-sparc}
management-eclipse-plugin.libs=${management-eclipse-plugin.core-libs} ${management-eclipse-plugin.platform-libs}
Modified: qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java Tue Mar 10 23:10:57 2009
@@ -42,6 +42,8 @@
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
System.out.println("Message: " + xfr);
Modified: qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java Tue Mar 10 23:10:57 2009
@@ -42,6 +42,8 @@
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
System.out.println("Message: " + xfr);
Modified: qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java Tue Mar 10 23:10:57 2009
@@ -32,6 +32,8 @@
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
String body = xfr.getBodyString();
Modified: qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java Tue Mar 10 23:10:57 2009
@@ -44,6 +44,8 @@
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
String body = xfr.getBodyString();
Modified: qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java Tue Mar 10 23:10:57 2009
@@ -40,6 +40,8 @@
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
DeliveryProperties dp = xfr.getHeader().get(DeliveryProperties.class);
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Mar 10 23:10:57 2009
@@ -268,6 +268,13 @@
//Indicates whether persistent messages are synchronized
private boolean _syncPersistence;
+ //Indicates whether we need to sync on every message ack
+ private boolean _syncAck;
+
+ //Indicates the sync publish options (persistent|all)
+ //By default it's async publish
+ private String _syncPublish = "";
+
/**
* @param broker brokerdetails
* @param username username
@@ -348,25 +355,53 @@
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
// set this connection maxPrefetch
- if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH) != null)
{
- _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+ _maxPrefetch = Integer.parseInt(connectionURL.getOption(ConnectionURL.OPTIONS_MAXPREFETCH));
}
else
{
// use the defaul value set for all connections
_maxPrefetch = Integer.parseInt(System.getProperties().getProperty(ClientProperties.MAX_PREFETCH_PROP_NAME,
- ClientProperties.MAX_PREFETCH_DEFAULT));
+ ClientProperties.MAX_PREFETCH_DEFAULT));
}
- if (connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE) != null)
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE) != null)
{
- _syncPersistence = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.AMQ_SYNC_PERSISTENCE));
+ _syncPersistence =
+ Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PERSISTENCE));
+ _logger.warn("sync_persistence is a deprecated property, " +
+ "please use sync_publish={persistent|all} instead");
}
else
{
// use the defaul value set for all connections
_syncPersistence = Boolean.getBoolean(ClientProperties.SYNC_PERSISTENT_PROP_NAME);
+ if (_syncPersistence)
+ {
+ _logger.warn("sync_persistence is a deprecated property, " +
+ "please use sync_publish={persistent|all} instead");
+ }
+ }
+
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK) != null)
+ {
+ _syncAck = Boolean.parseBoolean(connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_ACK));
+ }
+ else
+ {
+ // use the defaul value set for all connections
+ _syncAck = Boolean.getBoolean(ClientProperties.SYNC_ACK_PROP_NAME);
+ }
+
+ if (connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH) != null)
+ {
+ _syncPublish = connectionURL.getOption(ConnectionURL.OPTIONS_SYNC_PUBLISH);
+ }
+ else
+ {
+ // use the defaul value set for all connections
+ _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
}
_failoverPolicy = new FailoverPolicy(connectionURL, this);
@@ -1469,6 +1504,19 @@
return _syncPersistence;
}
+ /**
+ * Indicates whether we need to sync on every message ack
+ */
+ public boolean getSyncAck()
+ {
+ return _syncAck;
+ }
+
+ public String getSyncPublish()
+ {
+ return _syncPublish;
+ }
+
public void setIdleTimeout(long l)
{
_delegate.setIdleTimeout(l);
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Tue Mar 10 23:10:57 2009
@@ -239,12 +239,6 @@
{
_conn.failoverPrep();
_qpidConnection.resume();
-
- if (_conn.firePreResubscribe())
- {
- _conn.resubscribeSessions();
- }
-
_conn.fireFailoverComplete();
return;
}
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java Tue Mar 10 23:10:57 2009
@@ -22,14 +22,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.AMQException;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
-
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -90,38 +88,11 @@
{
checkState();
final BasicMessageConsumer consumer =
- (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
_consumers.add(consumer);
- return new Enumeration()
- {
-
- Message _nextMessage = consumer == null ? null : consumer.receive(1000);
-
- public boolean hasMoreElements()
- {
- _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
- return (_nextMessage != null);
- }
-
- public Object nextElement()
- {
- Message msg = _nextMessage;
- try
- {
- _logger.info("QB:nextElement about to receive");
- _nextMessage = consumer.receive(1000);
- _logger.info("QB:nextElement received:" + _nextMessage);
- }
- catch (JMSException e)
- {
- _logger.warn("Exception caught while queue browsing", e);
- _nextMessage = null;
- }
- return msg;
- }
- };
+ return new QueueBrowserEnumeration(consumer);
}
public void close() throws JMSException
@@ -134,4 +105,39 @@
_consumers.clear();
}
+ private class QueueBrowserEnumeration implements Enumeration
+ {
+ Message _nextMessage;
+ private BasicMessageConsumer _consumer;
+
+ public QueueBrowserEnumeration(BasicMessageConsumer consumer) throws JMSException
+ {
+ _nextMessage = consumer == null ? null : consumer.receiveBrowse();
+ _logger.info("QB:created with first element:" + _nextMessage);
+ _consumer = consumer;
+ }
+
+ public boolean hasMoreElements()
+ {
+ _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+ return (_nextMessage != null);
+ }
+
+ public Object nextElement()
+ {
+ Message msg = _nextMessage;
+ try
+ {
+ _logger.info("QB:nextElement about to receive");
+ _nextMessage = _consumer.receiveBrowse();
+ _logger.info("QB:nextElement received:" + _nextMessage);
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("Exception caught while queue browsing", e);
+ _nextMessage = null;
+ }
+ return msg;
+ }
+ }
}
Modified: qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/qpid-1673/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Mar 10 23:10:57 2009
@@ -575,12 +575,19 @@
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
final AMQShortString exchangeName, final AMQDestination destination) throws AMQException
{
+ bindQueue(queueName, routingKey, arguments, exchangeName, destination, false);
+ }
+
+ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
+ final AMQShortString exchangeName, final AMQDestination destination,
+ final boolean nowait) throws AMQException
+ {
/*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- sendQueueBind(queueName, routingKey, arguments, exchangeName, destination);
+ sendQueueBind(queueName, routingKey, arguments, exchangeName, destination, nowait);
return null;
}
}, _connection).execute();
@@ -595,7 +602,8 @@
}
public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException;
+ final AMQShortString exchangeName, AMQDestination destination,
+ final boolean nowait) throws AMQException, FailoverException;
/**
* Closes the session.
@@ -1815,6 +1823,11 @@
void failoverPrep()
{
startDispatcherIfNecessary();
+ syncDispatchQueue();
+ }
+
+ void syncDispatchQueue()
+ {
final CountDownLatch signal = new CountDownLatch(1);
_queue.add(new Dispatchable() {
public void dispatch(AMQSession ssn)
@@ -1828,7 +1841,7 @@
}
catch (InterruptedException e)
{
- // pass
+ throw new RuntimeException(e);
}
}
@@ -1859,6 +1872,11 @@
_inRecovery = inRecovery;
}
+ boolean isStarted()
+ {
+ return _startedAtLeastOnce.get();
+ }
+
/**
* Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
*
@@ -2281,7 +2299,13 @@
* @todo Be aware of possible changes to parameter order as versions change.
*/
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal)
+ final boolean noLocal) throws AMQException
+ {
+ return declareQueue(amqd, protocolHandler, noLocal, false);
+ }
+
+ protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean noLocal, final boolean nowait)
throws AMQException
{
/*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
@@ -2296,14 +2320,15 @@
amqd.setQueueName(protocolHandler.generateQueueName());
}
- sendQueueDeclare(amqd, protocolHandler);
+ sendQueueDeclare(amqd, protocolHandler, nowait);
return amqd.getAMQQueueName();
}
}, _connection).execute();
}
- public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException;
+ public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean nowait) throws AMQException, FailoverException;
/**
* Undeclares the specified queue.
@@ -2416,14 +2441,14 @@
AMQProtocolHandler protocolHandler = getProtocolHandler();
- declareExchange(amqd, protocolHandler, false);
+ declareExchange(amqd, protocolHandler, nowait);
- AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal());
+ AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
// store the consumer queue name
consumer.setQueuename(queueName);
- bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd);
+ bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
@@ -2455,11 +2480,7 @@
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
- }
- catch (JMSException e) // thrown by getMessageSelector
- {
- throw new AMQException(null, e.getMessage(), e);
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector);
}
catch (FailoverException e)
{
@@ -2531,8 +2552,9 @@
for (C consumer : consumers)
{
- consumer.failedOver();
+ consumer.failedOverPre();
registerConsumer(consumer, true);
+ consumer.failedOverPost();
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org