You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 16:42:53 UTC
svn commit: r1295627 [8/12] - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/ bdbstore/src/main/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/
bdbsto...
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java Thu Mar 1 15:42:44 2012
@@ -23,6 +23,7 @@ package org.apache.qpid.server.logging.a
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.NullRootMessageLogger;
/**
@@ -49,10 +50,7 @@ import org.apache.qpid.server.logging.Nu
public class CurrentActorTest extends BaseConnectionActorTestCase
{
//Set this to be a reasonably large number
- int THREADS = 10;
-
- // Record any exceptions that are thrown by the threads
- Exception[] _errors = new Exception[THREADS];
+ private static final int THREADS = 10;
/**
* Test that CurrentActor behaves as LIFO queue.
@@ -161,19 +159,11 @@ public class CurrentActorTest extends Ba
public void testThreadLocal()
{
- new Runnable(){
- public void run()
- {
- System.out.println(_errors[0]);
- }
- };
-
// Setup the threads
- Thread[] threads = new Thread[THREADS];
+ LogMessagesWithAConnectionActor[] threads = new LogMessagesWithAConnectionActor[THREADS];
for (int count = 0; count < THREADS; count++)
{
- Runnable test = new LogMessagesWithAConnectionActor(count);
- threads[count] = new Thread(test);
+ threads[count] = new LogMessagesWithAConnectionActor();
}
//Run the threads
@@ -198,10 +188,10 @@ public class CurrentActorTest extends Ba
// Verify that none of the tests threw an exception
for (int count = 0; count < THREADS; count++)
{
- if (_errors[count] != null)
+ if (threads[count].getException() != null)
{
- _errors[count].printStackTrace();
- fail("Error occured in thread:" + count);
+ threads[count].getException().printStackTrace();
+ fail("Error occured in thread:" + count + "("+threads[count].getException()+")");
}
}
}
@@ -210,13 +200,12 @@ public class CurrentActorTest extends Ba
* Creates a new ConnectionActor and logs the given number of messages
* before removing the actor and validating that there is no set actor.
*/
- public class LogMessagesWithAConnectionActor implements Runnable
+ public class LogMessagesWithAConnectionActor extends Thread
{
- int count;
+ Throwable _exception;
- LogMessagesWithAConnectionActor(int count)
+ public LogMessagesWithAConnectionActor()
{
- this.count = count;
}
public void run()
@@ -227,6 +216,7 @@ public class CurrentActorTest extends Ba
//fixme reminder that we need a better approach for broker testing.
try
{
+ LogActor defaultActor = CurrentActor.get();
AMQPConnectionActor actor = new AMQPConnectionActor(getSession(),
new NullRootMessageLogger());
@@ -237,20 +227,26 @@ public class CurrentActorTest extends Ba
sendTestLogMessage(CurrentActor.get());
// Verify it was the same actor as we set earlier
- assertEquals("Retrieved actor is not as expected ",
- actor, CurrentActor.get());
+ if(!actor.equals(CurrentActor.get()))
+ throw new IllegalArgumentException("Retrieved actor is not as expected ");
// Verify that removing the actor works for this thread
CurrentActor.remove();
- assertNull("CurrentActor should be null", CurrentActor.get());
+ if(CurrentActor.get() != defaultActor)
+ throw new IllegalArgumentException("CurrentActor ("+CurrentActor.get()+") should be default actor" + defaultActor);
}
- catch (Exception e)
+ catch (Throwable e)
{
- _errors[count] = e;
+ _exception = e;
}
}
+
+ public Throwable getException()
+ {
+ return _exception;
+ }
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Thu Mar 1 15:42:44 2012
@@ -64,17 +64,17 @@ public class AMQPriorityQueueTest extend
ArrayList<QueueEntry> msgs = _subscription.getMessages();
try
{
- assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageNumber());
- assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageNumber());
- assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageNumber());
-
- assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageNumber());
- assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageNumber());
- assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageNumber());
-
- assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageNumber());
- assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageNumber());
- assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageNumber());
+ assertEquals(1L, msgs.get(0).getMessage().getMessageNumber());
+ assertEquals(6L, msgs.get(1).getMessage().getMessageNumber());
+ assertEquals(8L, msgs.get(2).getMessage().getMessageNumber());
+
+ assertEquals(2L, msgs.get(3).getMessage().getMessageNumber());
+ assertEquals(5L, msgs.get(4).getMessage().getMessageNumber());
+ assertEquals(7L, msgs.get(5).getMessage().getMessageNumber());
+
+ assertEquals(3L, msgs.get(6).getMessage().getMessageNumber());
+ assertEquals(4L, msgs.get(7).getMessage().getMessageNumber());
+ assertEquals(9L, msgs.get(8).getMessage().getMessageNumber());
}
catch (AssertionFailedError afe)
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Thu Mar 1 15:42:44 2012
@@ -30,7 +30,6 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -168,22 +167,22 @@ public class MockAMQQueue implements AMQ
public UUID getId()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public QueueConfigType getConfigType()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public ConfiguredObject getParent()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public boolean isDurable()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean isAutoDelete()
@@ -199,7 +198,7 @@ public class MockAMQQueue implements AMQ
public AMQShortString getOwner()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public void setVirtualHost(VirtualHost virtualhost)
@@ -219,22 +218,22 @@ public class MockAMQQueue implements AMQ
public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void unregisterSubscription(Subscription subscription) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public int getConsumerCount()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public int getActiveConsumerCount()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public boolean hasExclusiveSubscriber()
@@ -244,37 +243,37 @@ public class MockAMQQueue implements AMQ
public boolean isUnused()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public boolean isEmpty()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public int getMessageCount()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public int getUndeliveredMessageCount()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public long getQueueDepth()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public long getReceivedMessageCount()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public long getOldestMessageArrivalTime()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public boolean isDeleted()
@@ -297,59 +296,58 @@ public class MockAMQQueue implements AMQ
}
+ public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException
+ {
+ }
+
public void requeue(QueueEntry entry)
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public void requeue(QueueEntryImpl storeContext, Subscription subscription)
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public void dequeue(QueueEntry entry, Subscription sub)
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public void addQueueDeleteTask(Task task)
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public void removeQueueDeleteTask(final Task task)
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public List<QueueEntry> getMessagesOnTheQueue()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public List<Long> getMessagesOnTheQueue(int num)
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public List<Long> getMessagesOnTheQueue(int num, int offest)
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public QueueEntry getMessageOnTheQueue(long messageId)
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
@@ -359,146 +357,137 @@ public class MockAMQQueue implements AMQ
public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public long getMaximumMessageSize()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setMaximumMessageSize(long value)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public long getMaximumMessageCount()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setMaximumMessageCount(long value)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public long getMaximumQueueDepth()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setMaximumQueueDepth(long value)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public long getMaximumMessageAge()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setMaximumMessageAge(long maximumMessageAge)
{
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean getBlockOnQueueFull()
- {
- return false;
- }
-
- public void setBlockOnQueueFull(boolean block)
- {
+
}
public long getMinimumAlertRepeatGap()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void deleteMessageFromTop()
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public long clearQueue()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void checkMessageStatus() throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public Set<NotificationCheck> getNotificationChecks()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public void flushSubscription(Subscription sub) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void deliverAsync(Subscription sub)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void deliverAsync()
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void stop()
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public boolean isExclusive()
{
- return false; //To change body of implemented methods use File | Settings | File Templates.
+ return false;
}
public Exchange getAlternateExchange()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public void setAlternateExchange(Exchange exchange)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public Map<String, Object> getArguments()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
- public void checkCapacity(AMQChannel channel)
+ public void checkCapacity(AMQSessionModel channel)
{
}
public ManagedObject getManagedObject()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public int compareTo(AMQQueue o)
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setMinimumAlertRepeatGap(long value)
@@ -508,22 +497,22 @@ public class MockAMQQueue implements AMQ
public long getCapacity()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setCapacity(long capacity)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public long getFlowResumeCapacity()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0;
}
public void setFlowResumeCapacity(long flowResumeCapacity)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void configure(ConfigurationPlugin config)
@@ -533,7 +522,7 @@ public class MockAMQQueue implements AMQ
public ConfigurationPlugin getConfiguration()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public AuthorizationHolder getAuthorizationHolder()
@@ -612,20 +601,20 @@ public class MockAMQQueue implements AMQ
}
- @Override
public int getMaximumDeliveryCount()
{
return 0;
}
- @Override
public void setMaximumDeliveryCount(int maximumDeliveryCount)
{
}
- @Override
public void setAlternateExchange(String exchangeName)
{
}
+ public void visit(final Visitor visitor)
+ {
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java Thu Mar 1 15:42:44 2012
@@ -23,7 +23,6 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -107,7 +106,17 @@ public class MockStoredMessage implement
return src.limit();
}
- public TransactionLog.StoreFuture flushToStore()
+
+
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ getContent(offsetInMessage, buf);
+ buf.position(0);
+ return buf;
+ }
+
+ public MessageStore.StoreFuture flushToStore()
{
return MessageStore.IMMEDIATE_FUTURE;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java Thu Mar 1 15:42:44 2012
@@ -164,7 +164,7 @@ public abstract class QueueEntryListTest
final QueueEntry head = getTestList().getHead();
assertNull("Head entry should not contain an actual message", head.getMessage());
assertEquals("Unexpected message id for first list entry", getExpectedFirstMsgId(), getTestList().next(head)
- .getMessage().getMessageNumber().longValue());
+ .getMessage().getMessageNumber());
}
/**
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Thu Mar 1 15:42:44 2012
@@ -649,9 +649,7 @@ public class SimpleAMQQueueTest extends
public void onRollback()
{
}
- });
-
-
+ }, 0L);
// Check that it is enqueued
AMQQueue data = _store.getMessages().get(1L);
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java Thu Mar 1 15:42:44 2012
@@ -162,8 +162,8 @@ public class SimpleQueueEntryListTest ex
while (entry != null)
{
assertFalse("Entry " + entry.getMessage().getMessageNumber() + " should not have been deleted", entry.isDeleted());
- assertNotNull("QueueEntry was not found in the list of remaining entries",
- remainingMessages.get(entry.getMessage().getMessageNumber().intValue()));
+ assertNotNull("QueueEntry "+entry.getMessage().getMessageNumber()+" was not found in the list of remaining entries " + remainingMessages,
+ remainingMessages.get((int)(entry.getMessage().getMessageNumber())));
count++;
entry = entry.getNextNode();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Thu Mar 1 15:42:44 2012
@@ -317,7 +317,7 @@ public class SortedQueueEntryListTest ex
assertEquals("Sorted queue entry value is not as expected",
expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY"));
assertEquals("Sorted queue entry id is not as expected",
- Long.valueOf(expectedMessageId), entry.getMessage().getMessageNumber());
+ expectedMessageId, entry.getMessage().getMessageNumber());
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Thu Mar 1 15:42:44 2012
@@ -558,7 +558,7 @@ public class MessageStoreTest extends In
/**
* Delete the Store Environment path
*
- * @param configuration The configuration that contains the store environment path.
+ * @param environmentPath The configuration that contains the store environment path.
*/
private void cleanup(File environmentPath)
{
@@ -636,7 +636,7 @@ public class MessageStoreTest extends In
{
//To change body of implemented methods use File | Settings | File Templates.
}
- });
+ }, 0L);
}
}
@@ -710,7 +710,7 @@ public class MessageStoreTest extends In
if (queue.isDurable() && !queue.isAutoDelete())
{
- getVirtualHost().getMessageStore().createQueue(queue, queueArguments);
+ getVirtualHost().getDurableConfigurationStore().createQueue(queue, queueArguments);
}
}
catch (AMQException e)
@@ -754,7 +754,7 @@ public class MessageStoreTest extends In
getVirtualHost().getExchangeRegistry().registerExchange(exchange);
if (durable)
{
- getVirtualHost().getMessageStore().createExchange(exchange);
+ getVirtualHost().getDurableConfigurationStore().createExchange(exchange);
}
}
catch (AMQException e)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java Thu Mar 1 15:42:44 2012
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQStoreException
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.exchange.Exchange;
@@ -42,18 +43,11 @@ import java.nio.ByteBuffer;
*/
public class SkeletonMessageStore implements MessageStore
{
- private final AtomicLong _messageId = new AtomicLong(1);
-
- public void configure(String base, Configuration config) throws Exception
- {
- }
-
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
Configuration config,
LogSubject logSubject) throws Exception
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public void configureMessageStore(String name,
@@ -61,7 +55,6 @@ public class SkeletonMessageStore implem
Configuration config,
LogSubject logSubject) throws Exception
{
- //To change body of implemented methods use File | Settings | File Templates.
}
public void close() throws Exception
@@ -70,31 +63,28 @@ public class SkeletonMessageStore implem
public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
- public void removeMessage(Long messageId)
- {
- }
public void createExchange(Exchange exchange) throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void removeExchange(Exchange exchange) throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void createQueue(AMQQueue queue) throws AMQStoreException
@@ -105,63 +95,11 @@ public class SkeletonMessageStore implem
{
}
-
-
-
- public List<AMQQueue> createQueues() throws AMQException
- {
- return null;
- }
-
- public Long getNewMessageId()
- {
- return _messageId.getAndIncrement();
- }
-
- public void storeContentBodyChunk(
- Long messageId,
- int index,
- ContentChunk contentBody,
- boolean lastContentBody) throws AMQException
- {
-
- }
-
- public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException
- {
-
- }
-
- public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
- {
- return null;
- }
-
- public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException
- {
- return null;
- }
-
public boolean isPersistent()
{
return false;
}
- public void storeMessageHeader(Long messageNumber, ServerMessage message)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void storeContent(Long messageNumber, long offset, ByteBuffer body)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public ServerMessage getMessage(Long messageNumber)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
public void removeQueue(final AMQQueue queue) throws AMQStoreException
{
@@ -172,7 +110,7 @@ public class SkeletonMessageStore implem
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public Transaction newTransaction()
@@ -180,19 +118,19 @@ public class SkeletonMessageStore implem
return new Transaction()
{
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public void commitTran() throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
public StoreFuture commitTranAsync() throws AMQStoreException
@@ -213,7 +151,7 @@ public class SkeletonMessageStore implem
public void abortTran() throws AMQStoreException
{
- //To change body of implemented methods use File | Settings | File Templates.
+
}
};
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java Thu Mar 1 15:42:44 2012
@@ -82,6 +82,12 @@ public class TestMemoryMessageStore exte
return _storedMessage.getContent(offsetInMessage, dst);
}
+
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return _storedMessage.getContent(offsetInMessage, size);
+ }
+
public StoreFuture flushToStore()
{
return _storedMessage.flushToStore();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Thu Mar 1 15:42:44 2012
@@ -25,6 +25,8 @@ import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
/**
@@ -66,14 +68,14 @@ public class TestableMemoryMessageStore
private class TestableTransaction implements Transaction
{
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- getMessages().put(messageId, (AMQQueue)queue);
+ getMessages().put(message.getMessageNumber(), (AMQQueue)queue);
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- getMessages().remove(messageId);
+ getMessages().remove(message.getMessageNumber());
}
public void commitTran() throws AMQStoreException
@@ -143,6 +145,12 @@ public class TestableMemoryMessageStore
return _storedMessage.getContent(offsetInMessage, dst);
}
+
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return _storedMessage.getContent(offsetInMessage, size);
+ }
+
public StoreFuture flushToStore()
{
return _storedMessage.flushToStore();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Thu Mar 1 15:42:44 2012
@@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.MockQueueEntry;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -44,7 +44,7 @@ public class AutoCommitTransactionTest e
{
private ServerTransaction _transaction = null; // Class under test
- private TransactionLog _transactionLog;
+ private MessageStore _transactionLog;
private AMQQueue _queue;
private List<AMQQueue> _queues;
private Collection<QueueEntry> _queueEntries;
@@ -137,7 +137,7 @@ public class AutoCommitTransactionTest e
_message = createTestMessage(false);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action);
+ _transaction.enqueue(_queues, _message, _action, 0L);
assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -157,7 +157,7 @@ public class AutoCommitTransactionTest e
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action);
+ _transaction.enqueue(_queues, _message, _action, 0L);
assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -175,7 +175,7 @@ public class AutoCommitTransactionTest e
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, true, false, true});
- _transaction.enqueue(_queues, _message, _action);
+ _transaction.enqueue(_queues, _message, _action, 0L);
assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState());
@@ -198,7 +198,7 @@ public class AutoCommitTransactionTest e
try
{
- _transaction.enqueue(_queues, _message, _action);
+ _transaction.enqueue(_queues, _message, _action, 0L);
fail("Exception not thrown");
}
catch (RuntimeException re)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Thu Mar 1 15:42:44 2012
@@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.MockQueueEntry;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -51,7 +51,7 @@ public class LocalTransactionTest extend
private MockAction _action1;
private MockAction _action2;
private MockStoreTransaction _storeTransaction;
- private TransactionLog _transactionLog;
+ private MessageStore _transactionLog;
@Override
@@ -140,7 +140,7 @@ public class LocalTransactionTest extend
_message = createTestMessage(false);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action1);
+ _transaction.enqueue(_queues, _message, _action1, 0L);
assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -156,7 +156,7 @@ public class LocalTransactionTest extend
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action1);
+ _transaction.enqueue(_queues, _message, _action1, 0L);
assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -173,7 +173,7 @@ public class LocalTransactionTest extend
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, true, false, true});
- _transaction.enqueue(_queues, _message, _action1);
+ _transaction.enqueue(_queues, _message, _action1, 0L);
assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState());
@@ -196,7 +196,7 @@ public class LocalTransactionTest extend
try
{
- _transaction.enqueue(_queues, _message, _action1);
+ _transaction.enqueue(_queues, _message, _action1, 0L);
fail("Exception not thrown");
}
catch (RuntimeException re)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Thu Mar 1 15:42:44 2012
@@ -27,11 +27,12 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.StoredMessage;
/**
* Mock Server Message allowing its persistent flag to be controlled from test.
*/
-class MockServerMessage implements ServerMessage<MockServerMessage>
+class MockServerMessage implements ServerMessage
{
/**
*
@@ -83,6 +84,11 @@ class MockServerMessage implements Serve
throw new NotImplementedException();
}
+ public StoredMessage getStoredMessage()
+ {
+ throw new NotImplementedException();
+ }
+
public long getExpiration()
{
throw new NotImplementedException();
@@ -93,12 +99,18 @@ class MockServerMessage implements Serve
throw new NotImplementedException();
}
+
+ public ByteBuffer getContent(int offset, int size)
+ {
+ throw new NotImplementedException();
+ }
+
public long getArrivalTime()
{
throw new NotImplementedException();
}
- public Long getMessageNumber()
+ public long getMessageNumber()
{
return 0L;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java Thu Mar 1 15:42:44 2012
@@ -24,11 +24,11 @@ import org.apache.commons.configuration.
import org.apache.commons.lang.NotImplementedException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.TransactionLog.StoreFuture;
-import org.apache.qpid.server.store.TransactionLog.Transaction;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.*;
+import org.apache.qpid.server.store.MessageStore.StoreFuture;
+import org.apache.qpid.server.store.MessageStore.Transaction;
/**
* Mock implementation of a (Store) Transaction allow its state to be observed.
@@ -61,7 +61,7 @@ class MockStoreTransaction implements Tr
return _state;
}
- public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
if (_throwExceptionOnQueueOp)
{
@@ -82,7 +82,7 @@ class MockStoreTransaction implements Tr
return _numberOfEnqueuedMessages;
}
- public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
if (_throwExceptionOnQueueOp)
{
@@ -107,10 +107,33 @@ class MockStoreTransaction implements Tr
_state = TransactionState.ABORTED;
}
- public static TransactionLog createTestTransactionLog(final MockStoreTransaction storeTransaction)
+ public static MessageStore createTestTransactionLog(final MockStoreTransaction storeTransaction)
{
- return new TransactionLog()
+ return new MessageStore()
{
+ public void configureMessageStore(final String name,
+ final MessageStoreRecoveryHandler recoveryHandler,
+ final Configuration config,
+ final LogSubject logSubject) throws Exception
+ {
+ //TODO.
+ }
+
+ public void close() throws Exception
+ {
+ //TODO.
+ }
+
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
+ {
+ return null; //TODO.
+ }
+
+ public boolean isPersistent()
+ {
+ return false; //TODO.
+ }
+
public void configureTransactionLog(String name, TransactionLogRecoveryHandler recoveryHandler,
Configuration storeConfiguration, LogSubject logSubject) throws Exception
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java Thu Mar 1 15:42:44 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.binding.BindingFactory;
@@ -41,7 +42,6 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
@@ -66,6 +66,11 @@ public class MockVirtualHost implements
}
+ public BrokerLink createBrokerConnection(final UUID id, final long createTime, final Map<String, String> arguments)
+ {
+ return null;
+ }
+
public IApplicationRegistry getApplicationRegistry()
{
return null;
@@ -161,10 +166,6 @@ public class MockVirtualHost implements
return null;
}
- public TransactionLog getTransactionLog()
- {
- return null;
- }
public void removeBrokerConnection(BrokerLink brokerLink)
{
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/velocity/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Mar 1 15:42:44 2012
@@ -0,0 +1,2 @@
+*.iml
+
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/xsl/qmf.xsl
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/xsl/qmf.xsl?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/xsl/qmf.xsl (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/xsl/qmf.xsl Thu Mar 1 15:42:44 2012
@@ -288,6 +288,17 @@ public class <xsl:value-of select="$Clas
<xsl:apply-templates select="node()[name()='property' or name()='statistic']" mode="optionalPropertyPresence"/>
<xsl:apply-templates select="node()[name()='property' or name()='statistic']" mode="encodeProperty"/>
}
+
+ public String toString()
+ {
+ return "QMF<xsl:value-of select="@name"/>GetQueryResponseCommand{id=" + getObject().getId()
+<xsl:for-each select="node()[name()='property' or name()='statistic']">
+<xsl:if test="@type!='hilo32' and @type!='mmaTime' ">
+ + ", <xsl:value-of select="@name"/>=" + getObject().get<xsl:call-template name="initCap"><xsl:with-param name="input"><xsl:value-of select="@name"/></xsl:with-param></xsl:call-template>()
+</xsl:if>
+</xsl:for-each>
+ + "}";
+ }
}
@@ -530,6 +541,11 @@ public class <xsl:value-of select="$Clas
{
return obj.<xsl:value-of select="@name"/>( new <xsl:value-of select="$ClassName"/>ResponseCommandFactory(cmd)<xsl:if test="node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]">, </xsl:if><xsl:apply-templates select="node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]" mode="methodArgList"><xsl:with-param name="prefix">_</xsl:with-param></xsl:apply-templates> );
}
+
+ public String toString()
+ {
+ return "<xsl:value-of select="$ClassName"/>["<xsl:for-each select="node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]"><xsl:if test="preceding-sibling::node()[name()='arg' and ( @dir='I' or @dir='IO' ) ]">+ ", "</xsl:if>+ "<xsl:value-of select="@name"/> = " + _<xsl:value-of select="@name"/> </xsl:for-each>+"]";
+ }
}
public final class <xsl:value-of select="$ClassName"/>ResponseCommandFactory
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps Thu Mar 1 15:42:44 2012
@@ -146,7 +146,7 @@ management-common.test.libs=${test.libs}
ra.libs=${geronimo-j2ee} ${geronimo-jta} ${geronimo-jms} ${slf4j-api} ${geronimo-kernel} ${geronimo-openejb}
# optional bdbstore module deps
-bdb-je=lib/bdbstore/je-4.0.117.jar
+bdb-je=lib/bdbstore/je-5.0.34.jar
bdbstore.libs=${bdb-je}
bdbstore.test.libs=${test.libs}
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/example/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Mar 1 15:42:44 2012
@@ -0,0 +1,2 @@
+*.iml
+
Propchange: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Thu Mar 1 15:42:44 2012
@@ -0,0 +1,2 @@
+*.iml
+
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Thu Mar 1 15:42:44 2012
@@ -308,7 +308,7 @@ public class AMQConnectionDelegate_8_0 i
{
AMQSession s = (AMQSession) it.next();
// _protocolHandler.addSessionByChannel(s.getChannelId(), s);
- reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
+ reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted());
s.resubscribe();
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Mar 1 15:42:44 2012
@@ -829,7 +829,7 @@ public abstract class AMQDestination imp
dest.setSubject(_subject);
dest.setCreate(_create);
dest.setAssert(_assert);
- dest.setDelete(_create);
+ dest.setDelete(_delete);
dest.setBrowseOnly(_browseOnly);
dest.setAddressType(_addressType);
dest.setTargetNode(_targetNode);
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Mar 1 15:42:44 2012
@@ -256,7 +256,7 @@ public abstract class AMQSession<C exten
protected AMQConnection _connection;
/** Used to indicate whether or not this is a transactional session. */
- protected boolean _transacted;
+ protected final boolean _transacted;
/** Holds the sessions acknowledgement mode. */
protected final int _acknowledgeMode;
@@ -371,7 +371,7 @@ public abstract class AMQSession<C exten
* Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of
* to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover().
*/
- private volatile boolean _usingDispatcherForCleanup;
+ protected volatile boolean _usingDispatcherForCleanup;
/** Used to indicates that the connection to which this session belongs, has been stopped. */
private boolean _connectionStopped;
@@ -1583,6 +1583,11 @@ public abstract class AMQSession<C exten
return _prefetchLowMark;
}
+ public int getPrefetch()
+ {
+ return _prefetchHighMark;
+ }
+
public AMQShortString getDefaultQueueExchangeName()
{
return _connection.getDefaultQueueExchangeName();
@@ -1614,7 +1619,24 @@ public abstract class AMQSession<C exten
return _ticket;
}
- public boolean getTransacted()
+ /**
+ * Indicates whether the session is in transacted mode.
+ *
+ * @return true if the session is in transacted mode
+ * @throws IllegalStateException - if session is closed.
+ */
+ public boolean getTransacted() throws JMSException
+ {
+ // Sun TCK checks that javax.jms.IllegalStateException is thrown for closed session
+ // nowhere else this behavior is documented
+ checkNotClosed();
+ return _transacted;
+ }
+
+ /**
+ * Indicates whether the session is in transacted mode.
+ */
+ public boolean isTransacted()
{
return _transacted;
}
@@ -3047,7 +3069,7 @@ public abstract class AMQSession<C exten
*/
public boolean prefetch()
{
- return getAMQConnection().getMaxPrefetch() > 0;
+ return _prefetchHighMark > 0;
}
/** Signifies that the session has pending sends to commit. */
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Mar 1 15:42:44 2012
@@ -30,7 +30,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
@@ -60,23 +59,7 @@ import org.apache.qpid.filter.MessageFil
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.ExchangeQueryResult;
-import org.apache.qpid.transport.ExecutionErrorCode;
-import org.apache.qpid.transport.ExecutionException;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.QueueQueryResult;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.*;
import org.apache.qpid.util.Serial;
import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
@@ -141,13 +124,13 @@ public class AMQSession_0_10 extends AMQ
private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
private TimerTask flushTask = null;
- private RangeSet unacked = new RangeSet();
+ private RangeSet unacked = RangeSetFactory.createRangeSet();
private int unackedCount = 0;
/**
* Used to store the range of in tx messages
*/
- private final RangeSet _txRangeSet = new RangeSet();
+ private final RangeSet _txRangeSet = RangeSetFactory.createRangeSet();
private int _txSize = 0;
//--- constructors
@@ -460,7 +443,7 @@ public class AMQSession_0_10 extends AMQ
public void sendRecover() throws AMQException, FailoverException
{
// release all unacked messages
- RangeSet all = new RangeSet();
+ RangeSet all = RangeSetFactory.createRangeSet();
RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
@@ -483,7 +466,7 @@ public class AMQSession_0_10 extends AMQ
private RangeSet gatherRangeSet(ConcurrentLinkedQueue<Long> messageTags)
{
- RangeSet ranges = new RangeSet();
+ RangeSet ranges = RangeSetFactory.createRangeSet();
while (true)
{
Long tag = messageTags.poll();
@@ -518,7 +501,7 @@ public class AMQSession_0_10 extends AMQ
public void rejectMessage(long deliveryTag, boolean requeue)
{
// The value of requeue is always true
- RangeSet ranges = new RangeSet();
+ RangeSet ranges = RangeSetFactory.createRangeSet();
ranges.add((int) deliveryTag);
flushProcessed(ranges, false);
if (requeue)
@@ -812,11 +795,43 @@ public class AMQSession_0_10 extends AMQ
{
if (suspend)
{
- for (BasicMessageConsumer consumer : _consumers.values())
- {
- getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
- Option.UNRELIABLE);
- }
+ synchronized (getMessageDeliveryLock())
+ {
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+ Option.UNRELIABLE);
+ sync();
+ List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags();
+ _prefetchedMessageTags.addAll(tags);
+ }
+ }
+
+ _usingDispatcherForCleanup = true;
+ syncDispatchQueue();
+ _usingDispatcherForCleanup = false;
+
+ RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
+ RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+ RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
+ + prefetched.size());
+
+ for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
+ {
+ Range range = deliveredIter.next();
+ all.add(range);
+ }
+
+ for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
+ {
+ Range range = prefetchedIter.next();
+ all.add(range);
+ }
+
+ flushProcessed(all, false);
+ getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
+ getQpidSession().messageRelease(prefetched);
+ sync();
}
else
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Mar 1 15:42:44 2012
@@ -21,6 +21,8 @@
package org.apache.qpid.client;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.Map;
@@ -80,7 +82,7 @@ import org.apache.qpid.transport.Transpo
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
+public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -96,8 +98,8 @@ public final class AMQSession_0_8 extend
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
*/
- AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+ protected AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+ MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
@@ -150,7 +152,7 @@ public final class AMQSession_0_8 extend
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
}
- getProtocolHandler().writeFrame(ackFrame);
+ getProtocolHandler().writeFrame(ackFrame, !isTransacted());
_unacknowledgedMessageTags.remove(deliveryTag);
}
@@ -512,7 +514,7 @@ public final class AMQSession_0_8 extend
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage =
_messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
- msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+ msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(),_queueDestinationCache,_topicDestinationCache);
AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
AMQShortString reason = msg.getReplyText();
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
@@ -572,6 +574,16 @@ public final class AMQSession_0_8 extend
}, _connection).execute();
}
+ public DestinationCache<AMQQueue> getQueueDestinationCache()
+ {
+ return _queueDestinationCache;
+ }
+
+ public DestinationCache<AMQTopic> getTopicDestinationCache()
+ {
+ return _topicDestinationCache;
+ }
+
class QueueDeclareOkHandler extends SpecificMethodFrameListener
{
@@ -613,12 +625,12 @@ public final class AMQSession_0_8 extend
return okHandler._messageCount;
}
- protected final boolean tagLE(long tag1, long tag2)
+ protected boolean tagLE(long tag1, long tag2)
{
return tag1 <= tag2;
}
- protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
+ protected boolean updateRollbackMark(long currentMark, long deliveryTag)
{
return false;
}
@@ -695,4 +707,55 @@ public final class AMQSession_0_8 extend
return null;
}
}
+
+ public abstract static class DestinationCache<T extends AMQDestination>
+ {
+ private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>();
+
+ public T getDestination(AMQShortString exchangeName, AMQShortString routingKey)
+ {
+ Map<AMQShortString, T> routingMap = cache.get(exchangeName);
+ if(routingMap == null)
+ {
+ routingMap = new LinkedHashMap<AMQShortString, T>()
+ {
+
+ protected boolean removeEldestEntry(Map.Entry<AMQShortString, T> eldest)
+ {
+ return size() >= 200;
+ }
+ };
+ cache.put(exchangeName,routingMap);
+ }
+ T destination = routingMap.get(routingKey);
+ if(destination == null)
+ {
+ destination = newDestination(exchangeName, routingKey);
+ routingMap.put(routingKey,destination);
+ }
+ return destination;
+ }
+
+ protected abstract T newDestination(AMQShortString exchangeName, AMQShortString routingKey);
+ }
+
+ private static class TopicDestinationCache extends DestinationCache<AMQTopic>
+ {
+ protected AMQTopic newDestination(AMQShortString exchangeName, AMQShortString routingKey)
+ {
+ return new AMQTopic(exchangeName, routingKey, null);
+ }
+ }
+
+ private static class QueueDestinationCache extends DestinationCache<AMQQueue>
+ {
+ protected AMQQueue newDestination(AMQShortString exchangeName, AMQShortString routingKey)
+ {
+ return new AMQQueue(exchangeName, routingKey, routingKey);
+ }
+ }
+
+ private final TopicDestinationCache _topicDestinationCache = new TopicDestinationCache();
+ private final QueueDestinationCache _queueDestinationCache = new QueueDestinationCache();
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Mar 1 15:42:44 2012
@@ -272,10 +272,8 @@ public class BasicMessageConsumer_0_10 e
*/
private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
{
- final RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
_0_10session.messageAcknowledge
- (ranges,
+ (Range.newInstance((int) message.getDeliveryTag()),
_acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
final AMQException amqe = _0_10session.getCurrentException();
@@ -294,9 +292,7 @@ public class BasicMessageConsumer_0_10 e
*/
private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
{
- final RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.flushProcessed(ranges,false);
+ _0_10session.flushProcessed(Range.newInstance((int) message.getDeliveryTag()),false);
final AMQException amqe = _0_10session.getCurrentException();
if (amqe != null)
@@ -315,10 +311,8 @@ public class BasicMessageConsumer_0_10 e
private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
{
boolean result = false;
- final RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+ final Acquired acq = _0_10session.getQpidSession().messageAcquire(Range.newInstance((int)message.getDeliveryTag())).get();
final RangeSet acquired = acq.getTransfers();
if (acquired != null && acquired.size() > 0)
@@ -451,7 +445,7 @@ public class BasicMessageConsumer_0_10 e
{
if (_synchronousQueue.size() > 0)
{
- RangeSet ranges = new RangeSet();
+ RangeSet ranges = RangeSetFactory.createRangeSet();
Iterator iterator = _synchronousQueue.iterator();
while (iterator.hasNext())
{
@@ -551,7 +545,7 @@ public class BasicMessageConsumer_0_10 e
}
else if (getSession().prefetch())
{
- capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+ capacity = getSession().getPrefetch();
}
return capacity;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Mar 1 15:42:44 2012
@@ -38,11 +38,13 @@ import org.slf4j.LoggerFactory;
public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
{
protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ private AMQSession_0_8.DestinationCache<AMQTopic> _topicDestinationCache;
+ private AMQSession_0_8.DestinationCache<AMQQueue> _queueDestinationCache;
private final RejectBehaviour _rejectBehaviour;
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
- String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession_0_8 session,
AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
@@ -60,6 +62,9 @@ public class BasicMessageConsumer_0_8 ex
consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
+ _topicDestinationCache = session.getTopicDestinationCache();
+ _queueDestinationCache = session.getQueueDestinationCache();
+
if (destination.getRejectBehaviour() != null)
{
_rejectBehaviour = destination.getRejectBehaviour();
@@ -100,7 +105,8 @@ public class BasicMessageConsumer_0_8 ex
return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
messageFrame.isRedelivered(), messageFrame.getExchange(),
- messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+ messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
+ _queueDestinationCache, _topicDestinationCache);
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Thu Mar 1 15:42:44 2012
@@ -40,6 +40,7 @@ import org.apache.qpid.framing.Composite
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.MethodRegistry;
public class BasicMessageProducer_0_8 extends BasicMessageProducer
{
@@ -53,15 +54,17 @@ public class BasicMessageProducer_0_8 ex
void declareDestination(AMQDestination destination)
{
- ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
- destination.getExchangeName(),
- destination.getExchangeClass(),
- false,
- false,
- false,
- false,
- true,
- null);
+ final MethodRegistry methodRegistry = getSession().getMethodRegistry();
+ ExchangeDeclareBody body =
+ methodRegistry.createExchangeDeclareBody(_session.getTicket(),
+ destination.getExchangeName(),
+ destination.getExchangeClass(),
+ destination.getExchangeName().toString().startsWith("amq."),
+ false,
+ false,
+ false,
+ true,
+ null);
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org