You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/20 18:23:11 UTC
svn commit: r827724 [8/8] - in /qpid/branches/java-broker-0-10/qpid/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/src/main/java/org/apach...
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Tue Oct 20 16:23:01 2009
@@ -36,13 +36,16 @@
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
public class SimpleAMQQueueTest extends TestCase
{
@@ -94,7 +97,7 @@
ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance();
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _store);
+ _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration(getClass().getName(), env), _store);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
@@ -271,7 +274,7 @@
AMQMessage message = createMessage(id);
_queue.enqueue(message);
QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
- entry.setRedelivered(true);
+ entry.setRedelivered();
_queue.resend(entry, _subscription);
}
@@ -399,23 +402,25 @@
public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
{
// Create IncomingMessage and nondurable queue
- final IncomingMessage msg = new IncomingMessage(1L, info, null);
+ final IncomingMessage msg = new IncomingMessage(info);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
+
final ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
// Send persistent message
qs.add(_queue);
- msg.routingComplete(_store, new MessageHandleFactory());
- _store.storeMessageMetaData(new Long(1L), new MessageMetaData(info, contentHeaderBody, 1));
+ MessageMetaData metaData = msg.headersReceived();
+ StoredMessage handle = _store.addMessage(metaData);
+ msg.setStoredMessage(handle);
- Transaction txn = new AutoCommitTransaction(_store);
+ ServerTransaction txn = new AutoCommitTransaction(_store);
- txn.enqueue(qs, msg, new Transaction.Action()
+ txn.enqueue(qs, msg, new ServerTransaction.Action()
{
public void postCommit()
{
@@ -435,7 +440,7 @@
// Dequeue message
MockQueueEntry entry = new MockQueueEntry();
- AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory());
+ AMQMessage amqmsg = new AMQMessage(handle);
entry.setMessage(amqmsg);
_queue.dequeue(entry);
@@ -446,23 +451,12 @@
}
- // FIXME: move this to somewhere useful
- private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody)
- {
- final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
- null,
- false);
-
- amqMessageHandle.setPublishAndContentHeaderBody(publishBody, contentHeaderBody);
- return amqMessageHandle;
- }
-
public class TestMessage extends AMQMessage
{
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext)
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody)
throws AMQException
{
this(tag, messageId, publishBody, new ContentHeaderBody(1, 1, new BasicContentHeaderProperties(), 0));
@@ -471,7 +465,7 @@
TestMessage(long tag, long messageId, MessagePublishInfo publishBody, ContentHeaderBody chb)
throws AMQException
{
- super(createMessageHandle(messageId, publishBody, chb), chb, 0, publishBody);
+ super(new MockStoredMessage(messageId, publishBody, chb));
_tag = tag;
}
@@ -481,7 +475,7 @@
return true;
}
- public void decrementReference(StoreContext context)
+ public void decrementReference()
{
_count--;
}
@@ -494,7 +488,7 @@
protected AMQMessage createMessage(Long id) throws AMQException
{
- AMQMessage messageA = new TestMessage(id, id, info, new StoreContext());
+ AMQMessage messageA = new TestMessage(id, id, info);
return messageA;
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java Tue Oct 20 16:23:01 2009
@@ -27,8 +27,6 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.AMQException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class SimpleAMQQueueThreadPoolTest extends TestCase
{
@@ -47,7 +45,7 @@
assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown());
assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount());
-
+
queue.stop();
assertEquals("References not decreased", initialCount , ReferenceCountingExecutorService.getInstance().getReferenceCount());
@@ -55,6 +53,6 @@
finally
{
ApplicationRegistry.remove();
- }
+ }
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java Tue Oct 20 16:23:01 2009
@@ -57,11 +57,11 @@
BufferedWriter out = new BufferedWriter(new FileWriter(tmpFile));
out.write("<security><queueDenier>notyet</queueDenier><exchangeDenier>yes</exchangeDenier></security>");
out.close();
-
+
_conf = new SecurityConfiguration(new XMLConfiguration(tmpFile));
-
+
// Create ACLManager
-
+
_pluginManager = new MockPluginManager("");
_authzManager = new ACLManager(_conf, _pluginManager);
@@ -79,15 +79,15 @@
// Correctly Close the AR we created
ApplicationRegistry.remove();
super.tearDown();
- }
-
+ }
+
public void testACLManagerConfigurationPluginManager() throws Exception
{
AMQQueue queue = new MockAMQQueue("notyet");
AMQQueue otherQueue = new MockAMQQueue("other");
-
+
assertFalse(_authzManager.authoriseDelete(_session, queue));
-
+
// This should only be denied if the config hasn't been correctly passed in
assertTrue(_authzManager.authoriseDelete(_session, otherQueue));
assertTrue(_authzManager.authorisePurge(_session, queue));
@@ -96,11 +96,11 @@
public void testACLManagerConfigurationPluginManagerACLPlugin() throws ConfigurationException
{
_authzManager = new ACLManager(_conf, _pluginManager, ExchangeDenier.FACTORY);
-
+
Exchange exchange = null;
assertFalse(_authzManager.authoriseDelete(_session, exchange));
}
-
+
public void testConfigurePlugins() throws ConfigurationException
{
Configuration hostConfig = new PropertiesConfiguration();
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java Tue Oct 20 16:23:01 2009
@@ -14,17 +14,16 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.security.access;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.security.access.plugins.AllowAll;
-import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
public class ExchangeDenier extends AllowAll
{
@@ -41,7 +40,7 @@
return new ExchangeDenier();
}
};
-
+
@Override
public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange)
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java Tue Oct 20 16:23:01 2009
@@ -27,14 +27,13 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.amqp_0_9.ExchangeDeclareBodyImpl;
-import org.apache.qpid.framing.amqp_0_9.QueueDeclareBodyImpl;
import org.apache.qpid.framing.amqp_8_0.QueueBindBodyImpl;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
-import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -43,7 +42,7 @@
private String _user = "user";
private PrincipalPermissions _perms;
-
+
// Common things that are passed to frame constructors
private AMQShortString _queueName = new AMQShortString(this.getClass().getName()+"queue");
private AMQShortString _exchangeName = new AMQShortString("amq.direct");
@@ -62,21 +61,21 @@
private AMQShortString _owner = new AMQShortString(this.getClass().getName()+"owner");
private AMQQueue _queue;
private Boolean _temporary = false;
-
+
@Override
public void setUp()
{
//Highlight that this test will cause a new AR to be created
- ApplicationRegistry.getInstance();
+ ApplicationRegistry.getInstance();
_perms = new PrincipalPermissions(_user);
- try
+ try
{
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
+ _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration("test", env));
_exchange = DirectExchange.TYPE.newInstance(_virtualHost, _exchangeName, _durable, _ticket, _autoDelete);
_queue = AMQQueueFactory.createAMQQueueImpl(_queueName, false, _owner , false, _virtualHost, _arguments);
- }
+ }
catch (Exception e)
{
fail(e.getMessage());
@@ -103,7 +102,7 @@
{
QueueBindBodyImpl bind = new QueueBindBodyImpl(_ticket, _queueName, _exchangeName, _routingKey, _nowait, _arguments);
Object[] args = new Object[]{bind, _exchange, _queue, _routingKey};
-
+
assertEquals(AuthzResult.DENIED, _perms.authorise(Permission.BIND, args));
_perms.grant(Permission.BIND, (Object[]) null);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.BIND, args));
@@ -113,7 +112,7 @@
{
Object[] grantArgs = new Object[]{_temporary , _queueName, _exchangeName, _routingKey};
Object[] authArgs = new Object[]{_autoDelete, _queueName};
-
+
assertEquals(AuthzResult.DENIED, _perms.authorise(Permission.CREATEQUEUE, authArgs));
_perms.grant(Permission.CREATEQUEUE, grantArgs);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CREATEQUEUE, authArgs));
@@ -128,41 +127,41 @@
_perms.grant(Permission.CREATEQUEUE, grantArgs);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CREATEQUEUE, authArgs));
}
-
+
// FIXME disabled, this fails due to grant putting the grant into the wrong map QPID-1598
public void disableTestExchangeCreate()
{
- ExchangeDeclareBodyImpl exchangeDeclare =
+ ExchangeDeclareBodyImpl exchangeDeclare =
new ExchangeDeclareBodyImpl(_ticket, _exchangeName, _exchangeType, _passive, _durable,
_autoDelete, _internal, _nowait, _arguments);
Object[] authArgs = new Object[]{exchangeDeclare};
Object[] grantArgs = new Object[]{_exchangeName, _exchangeType};
-
+
assertEquals(AuthzResult.DENIED, _perms.authorise(Permission.CREATEEXCHANGE, authArgs));
_perms.grant(Permission.CREATEEXCHANGE, grantArgs);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CREATEEXCHANGE, authArgs));
}
-
+
public void testConsume()
{
Object[] authArgs = new Object[]{_queue};
Object[] grantArgs = new Object[]{_queueName, _temporary, _temporary};
-
+
/* FIXME: This throws a null pointer exception QPID-1599
* assertFalse(_perms.authorise(Permission.CONSUME, authArgs));
*/
_perms.grant(Permission.CONSUME, grantArgs);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.CONSUME, authArgs));
}
-
+
public void testPublish()
{
Object[] authArgs = new Object[]{_exchange, _routingKey};
Object[] grantArgs = new Object[]{_exchange.getName(), _routingKey};
-
+
assertEquals(AuthzResult.DENIED, _perms.authorise(Permission.PUBLISH, authArgs));
_perms.grant(Permission.PUBLISH, grantArgs);
assertEquals(AuthzResult.ALLOWED, _perms.authorise(Permission.PUBLISH, authArgs));
}
-
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java Tue Oct 20 16:23:01 2009
@@ -14,22 +14,20 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.security.access;
import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.security.access.plugins.AllowAll;
-import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
public class QueueDenier extends AllowAll
{
-
+
public static final ACLPluginFactory FACTORY = new ACLPluginFactory()
{
public boolean supportsTag(String name)
@@ -44,18 +42,18 @@
return plugin;
}
};
-
+
private String _queueName = "";
-
+
@Override
public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue)
{
if (!(queue.getName().toString().equals(_queueName)))
{
return AuthzResult.ALLOWED;
- }
- else
+ }
+ else
{
return AuthzResult.DENIED;
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Tue Oct 20 16:23:01 2009
@@ -29,11 +29,13 @@
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.queue.*;
-import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -97,7 +99,7 @@
try
{
- _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), configuration));
+ _virtualHost = new VirtualHostImpl(new VirtualHostConfiguration(getClass().getName(), configuration));
ApplicationRegistry.getInstance().getVirtualHostRegistry().registerVirtualHost(_virtualHost);
}
catch (Exception e)
@@ -163,7 +165,7 @@
Exchange topicExchange = createExchange(TopicExchange.TYPE, topicExchangeName, true);
bindAllTopicQueuesToExchange(topicExchange, topicRouting);
- //Send Message To NonDurable direct Exchange = persistent
+ //Send Message To NonDurable direct Exchange = persistent
sendMessageOnExchange(nonDurableExchange, directRouting, true);
// and non-persistent
sendMessageOnExchange(nonDurableExchange, directRouting, false);
@@ -340,18 +342,8 @@
final IncomingMessage currentMessage;
- try
- {
- currentMessage = new IncomingMessage(_virtualHost.getMessageStore().getNewMessageId(),
- messageInfo,
- new InternalTestProtocolSession(_virtualHost));
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- //help compiler - next line never reached
- throw new RuntimeException();
- }
+
+ currentMessage = new IncomingMessage(messageInfo);
currentMessage.setExchange(directExchange);
@@ -372,31 +364,25 @@
currentMessage.setExpiration();
+ MessageMetaData mmd = currentMessage.headersReceived();
+ currentMessage.setStoredMessage(_virtualHost.getMessageStore().addMessage(mmd));
currentMessage.route();
- try
- {
- currentMessage.routingComplete(_virtualHost.getMessageStore(), new MessageHandleFactory());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
// check and deliver if header says body length is zero
if (currentMessage.allContentReceived())
{
// TODO Deliver to queues
- Transaction trans = new AutoCommitTransaction(_virtualHost.getMessageStore());
+ ServerTransaction trans = new AutoCommitTransaction(_virtualHost.getMessageStore());
final List<AMQQueue> destinationQueues = currentMessage.getDestinationQueues();
- trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new Transaction.Action() {
+ trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() {
public void postCommit()
{
try
{
- AMQMessage message = new AMQMessage(currentMessage.getMessageHandle(), currentMessage.getContentHeader(), currentMessage.getSize() ,currentMessage.getMessagePublishInfo());
+ AMQMessage message = new AMQMessage(currentMessage.getStoredMessage());
for(AMQQueue queue : destinationQueues)
{
@@ -405,7 +391,7 @@
}
catch (AMQException e)
{
- e.printStackTrace();
+ e.printStackTrace();
}
}
@@ -502,14 +488,7 @@
fail(e.getMessage());
}
- try
- {
- _virtualHost.getQueueRegistry().registerQueue(queue);
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+ _virtualHost.getQueueRegistry().registerQueue(queue);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java Tue Oct 20 16:23:01 2009
@@ -25,12 +25,11 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.logging.LogSubject;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -47,8 +46,19 @@
public void configure(String base, Configuration config) throws Exception
{
}
-
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -57,6 +67,11 @@
{
}
+ public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void removeMessage(Long messageId)
{
}
@@ -87,41 +102,10 @@
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
{
- }
-
- public void beginTran(StoreContext s) throws AMQException
- {
- }
-
- public boolean inTran(StoreContext sc)
- {
- return false;
}
- public void commitTran(StoreContext storeContext) throws AMQException
- {
- }
- public StoreFuture commitTranAsync(StoreContext context) throws AMQException
- {
- commitTran(context);
- return new StoreFuture()
- {
- public boolean isComplete()
- {
- return true;
- }
-
- public void waitForCompletion()
- {
-
- }
- };
- }
- public void abortTran(StoreContext storeContext) throws AMQException
- {
- }
public List<AMQQueue> createQueues() throws AMQException
{
@@ -182,13 +166,55 @@
}
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration,
+ LogSubject logSubject) throws Exception
{
-
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
- {
-
+ public Transaction newTransaction()
+ {
+ return new Transaction()
+ {
+
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void commitTran() throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public StoreFuture commitTranAsync() throws AMQException
+ {
+ return new StoreFuture()
+ {
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public void waitForCompletion()
+ {
+
+ }
+ };
+ }
+
+ public void abortTran() throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ };
}
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java Tue Oct 20 16:23:01 2009
@@ -20,32 +20,79 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.framing.abstraction.ContentChunk;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
+import java.nio.ByteBuffer;
/**
* Adds some extra methods to the memory message store for testing purposes.
*/
public class TestMemoryMessageStore extends MemoryMessageStore
{
+ private AtomicInteger _messageCount = new AtomicInteger(0);
+
+
public TestMemoryMessageStore()
{
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
}
- public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
+ @Override
+ public StoredMessage addMessage(StorableMessageMetaData metaData)
{
- return _metaDataMap;
+ return new TestableStoredMessage(super.addMessage(metaData));
}
- public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
+ public int getMessageCount()
{
- return _contentBodyMap;
+ return _messageCount.get();
+ }
+
+ private class TestableStoredMessage implements StoredMessage
+ {
+ private final StoredMessage _storedMessage;
+
+ public TestableStoredMessage(StoredMessage storedMessage)
+ {
+ _messageCount.incrementAndGet();
+ _storedMessage = storedMessage;
+ }
+
+ public StorableMessageMetaData getMetaData()
+ {
+ return _storedMessage.getMetaData();
+ }
+
+ public long getMessageNumber()
+ {
+ return _storedMessage.getMessageNumber();
+ }
+
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ _storedMessage.addContent(offsetInMessage, src);
+ }
+
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ return _storedMessage.getContent(offsetInMessage, dst);
+ }
+
+ public StoreFuture flushToStore()
+ {
+ return _storedMessage.flushToStore();
+ }
+
+ public void remove()
+ {
+ _storedMessage.remove();
+ _messageCount.decrementAndGet();
+ }
+
}
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Tue Oct 20 16:23:01 2009
@@ -26,10 +26,8 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.AMQMessageHandle;
-import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
/**
* Tests that reference counting works correctly with AMQMessage and the message store
@@ -83,15 +81,12 @@
};
- final long messageId = _store.getNewMessageId();
- AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
- MessageMetaData mmd = messageHandle.setPublishAndContentHeaderBody(info, chb);
- _store.storeMessageMetaData(messageId, mmd);
+ MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+ StoredMessage storedMessage = _store.addMessage(mmd);
- AMQMessage message = new AMQMessage(messageHandle,
- chb, chb.bodySize,info);
+ AMQMessage message = new AMQMessage(storedMessage);
message = message.takeReference();
@@ -99,9 +94,9 @@
// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
- assertEquals(1, _store.getMessageMetaDataMap().size());
+ assertEquals(1, _store.getMessageCount());
message.decrementReference();
- assertEquals(1, _store.getMessageMetaDataMap().size());
+ assertEquals(1, _store.getMessageCount());
}
private ContentHeaderBody createPersistentContentHeader()
@@ -145,28 +140,24 @@
}
};
- final Long messageId = _store.getNewMessageId();
final ContentHeaderBody chb = createPersistentContentHeader();
- AMQMessageHandle messageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, _store, true);
- MessageMetaData mmd = messageHandle.setPublishAndContentHeaderBody(info, chb);
- _store.storeMessageMetaData(messageId, mmd);
+ MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+ StoredMessage storedMessage = _store.addMessage(mmd);
+
+ AMQMessage message = new AMQMessage(storedMessage);
+
- AMQMessage message = new AMQMessage(messageHandle,
- chb, chb.bodySize,
- info);
-
-
message = message.takeReference();
// we call routing complete to set up the handle
// message.routingComplete(_store, _storeContext, new MessageHandleFactory());
- assertEquals(1, _store.getMessageMetaDataMap().size());
+ assertEquals(1, _store.getMessageCount());
message = message.takeReference();
message.decrementReference();
- assertEquals(1, _store.getMessageMetaDataMap().size());
+ assertEquals(1, _store.getMessageCount());
}
public static junit.framework.Test suite()
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Tue Oct 20 16:23:01 2009
@@ -22,14 +22,15 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.framing.abstraction.ContentChunk;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.HashMap;
import java.util.List;
+import java.nio.ByteBuffer;
/**
* Adds some extra methods to the memory message store for testing purposes.
@@ -39,6 +40,7 @@
MemoryMessageStore _mms = null;
private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
+ private AtomicInteger _messageCount = new AtomicInteger(0);
public TestableMemoryMessageStore(MemoryMessageStore mms)
{
@@ -47,46 +49,111 @@
public TestableMemoryMessageStore()
{
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
+
+ }
+
+
+
+
+ @Override
+ public StoredMessage addMessage(StorableMessageMetaData metaData)
+ {
+ return new TestableStoredMessage(super.addMessage(metaData));
+ }
+
+ public int getMessageCount()
+ {
+ return _messageCount.get();
}
- public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
+ private class TestableTransaction implements Transaction
{
- if (_mms != null)
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
{
- return _mms._metaDataMap;
+ getMessages().put(messageId, (AMQQueue)queue);
}
- else
+
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
{
- return _metaDataMap;
+ getMessages().remove(messageId);
}
- }
- public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
- {
- if (_mms != null)
+ public void commitTran() throws AMQException
{
- return _mms._contentBodyMap;
}
- else
+
+ public StoreFuture commitTranAsync() throws AMQException
+ {
+ return new StoreFuture()
+ {
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public void waitForCompletion()
+ {
+
+ }
+ };
+ }
+
+ public void abortTran() throws AMQException
{
- return _contentBodyMap;
}
- }
-
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
- {
- getMessages().put(messageId, queue);
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+
+ @Override
+ public Transaction newTransaction()
{
- getMessages().remove(messageId);
+ return new TestableTransaction();
}
public HashMap<Long, AMQQueue> getMessages()
{
return _messages;
}
+
+ private class TestableStoredMessage implements StoredMessage
+ {
+ private final StoredMessage _storedMessage;
+
+ public TestableStoredMessage(StoredMessage storedMessage)
+ {
+ _messageCount.incrementAndGet();
+ _storedMessage = storedMessage;
+ }
+
+ public StorableMessageMetaData getMetaData()
+ {
+ return _storedMessage.getMetaData();
+ }
+
+ public long getMessageNumber()
+ {
+ return _storedMessage.getMessageNumber();
+ }
+
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ _storedMessage.addContent(offsetInMessage, src);
+ }
+
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ return _storedMessage.getContent(offsetInMessage, dst);
+ }
+
+ public StoreFuture flushToStore()
+ {
+ return _storedMessage.flushToStore();
+ }
+
+ public void remove()
+ {
+ _storedMessage.remove();
+ _messageCount.decrementAndGet();
+ }
+ }
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Tue Oct 20 16:23:01 2009
@@ -127,7 +127,17 @@
public void confirmAutoClose()
{
-
+
+ }
+
+ public void set(String key, Object value)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object get(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
}
public boolean isAutoClose()
@@ -197,6 +207,10 @@
this.queue = queue;
}
+ public void setNoLocal(boolean noLocal)
+ {
+ }
+
public void setStateListener(StateListener listener)
{
this._listener = listener;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Tue Oct 20 16:23:01 2009
@@ -31,7 +31,6 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
@@ -94,7 +93,7 @@
protected void checkStoreContents(int messageCount)
{
- assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageMetaDataMap().size());
+ assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageCount());
//The above publish message is sufficiently small not to fit in the header so no Body is required.
//assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size());
@@ -111,11 +110,7 @@
e.printStackTrace();
fail(e.getMessage());
}
- catch (ConsumerTagNotUniqueException e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
//Keep the compiler happy
return null;
}
@@ -134,11 +129,7 @@
e.printStackTrace();
fail(e.getMessage());
}
- catch (ConsumerTagNotUniqueException e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
+
//Keep the compiler happy
return null;
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,8 +35,9 @@
import org.apache.qpid.server.security.access.plugins.AllowAll;
import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Arrays;
import java.util.Collection;
@@ -75,7 +76,7 @@
_virtualHostRegistry = new VirtualHostRegistry(this);
PropertiesConfiguration vhostProps = new PropertiesConfiguration();
VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps);
- VirtualHost dummyHost = new VirtualHost(hostConfig);
+ VirtualHost dummyHost = new VirtualHostImpl(hostConfig);
_virtualHostRegistry.registerVirtualHost(dummyHost);
_virtualHostRegistry.setDefaultVirtualHostName("test");
_pluginManager = new PluginManager("");
@@ -94,7 +95,7 @@
{
try
{
- super.close();
+ super.close();
}
finally
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,8 +35,9 @@
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.logging.RootMessageLoggerImpl;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.TestLogActor;
@@ -60,7 +61,7 @@
private ServerConfiguration _config;
-
+
public TestApplicationRegistry() throws ConfigurationException
{
super(new ServerConfiguration(new PropertiesConfiguration()));
@@ -96,10 +97,10 @@
_messageStore = new TestableMemoryMessageStore();
_virtualHostRegistry = new VirtualHostRegistry(this);
-
+
PropertiesConfiguration vhostProps = new PropertiesConfiguration();
VirtualHostConfiguration hostConfig = new VirtualHostConfiguration("test", vhostProps);
- _vHost = new VirtualHost(hostConfig, _messageStore);
+ _vHost = new VirtualHostImpl(hostConfig, _messageStore);
_virtualHostRegistry.registerVirtualHost(_vHost);
@@ -152,7 +153,7 @@
CurrentActor.remove();
}
}
-
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java Tue Oct 20 16:23:01 2009
@@ -35,9 +35,6 @@
super(session, channelId, messageStore);
}
- public Subscription getSubscription(AMQShortString subscription)
- {
- return _tag2SubscriptionMap.get(subscription);
- }
-
+
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java Tue Oct 20 16:23:01 2009
@@ -108,6 +108,8 @@
createMessageClass("ManagementConsole", "MNG");
createMessageClass("VirtualHost", "VHT");
createMessageClass("MessageStore", "MST");
+ createMessageClass("ConfigStore", "CFG");
+ createMessageClass("TransactionLog", "TXN");
createMessageClass("Connection", "CON");
createMessageClass("Channel", "CHN");
createMessageClass("Queue", "QUE");
@@ -465,4 +467,4 @@
super(message);
}
}
-}
\ No newline at end of file
+}
Modified: qpid/branches/java-broker-0-10/qpid/java/build.deps
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/build.deps?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/build.deps (original)
+++ qpid/branches/java-broker-0-10/qpid/java/build.deps Tue Oct 20 16:23:01 2009
@@ -82,7 +82,7 @@
broker.libs=${common.libs} ${commons-cli} ${commons-logging} ${log4j} \
${slf4j-log4j} ${xalan} ${felix.libs} ${derby-db}
-broker-plugins.libs=${common.libs} ${felix.libs}
+broker-plugins.libs=${common.libs} ${felix.libs} ${log4j}
management-client.libs=${jsp.libs} ${log4j} ${slf4j-log4j} ${slf4j-api} ${commons-pool} ${geronimo-servlet} ${muse.libs} ${javassist} ${xalan} ${mina-core} ${mina-filter-ssl}
management-agent.libs=${client.libs} ${commons-logging} ${geronimo-jms}
Modified: qpid/branches/java-broker-0-10/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Tue Oct 20 16:23:01 2009
@@ -468,7 +468,7 @@
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
throws JMSException
{
- String rk = "";
+ String rk = null;
boolean res;
if (bindingKeys != null && bindingKeys.length>0)
{
Modified: qpid/branches/java-broker-0-10/qpid/java/common/Composite.tpl
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/Composite.tpl?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/Composite.tpl (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/Composite.tpl Tue Oct 20 16:23:01 2009
@@ -127,7 +127,12 @@
${
for f in fields:
if f.option: continue
- out(" $(f.set)($(f.name));\n")
+ if f.ref_type != f.type:
+ out(" $(f.set)($(f.name));\n")
+ else:
+ out(" if($(f.name) != null) {\n")
+ out(" $(f.set)($(f.name));\n")
+ out(" }\n")
if segments:
out(" setHeader(header);\n")
Modified: qpid/branches/java-broker-0-10/qpid/java/common/genutil.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/genutil.py?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/genutil.py (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/genutil.py Tue Oct 20 16:23:01 2009
@@ -198,6 +198,7 @@
self.read = "dec.read%s()" % self.coder
self.write = "enc.write%s(check(struct).%s)" % (self.coder, self.name)
self.type = jtype(self.type_node)
+ self.ref_type = jref(self.type)
self.default = DEFAULTS.get(self.type, "null")
self.has = camel(1, "has", self.name)
self.get = camel(1, "get", self.name)
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java Tue Oct 20 16:23:01 2009
@@ -23,10 +23,14 @@
import org.apache.qpid.framing.AMQBody;
+import java.nio.ByteBuffer;
+
public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
{
AMQBody convertToBody(ContentChunk contentBody);
ContentChunk convertToContentChunk(AMQBody body);
void configure();
+
+ AMQBody convertToBody(ByteBuffer buf);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Tue Oct 20 16:23:01 2009
@@ -72,6 +72,11 @@
}
+ public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ {
+ return new ContentBody(ByteBuffer.wrap(buf));
+ }
+
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
{
final BasicPublishBody publishBody = ((BasicPublishBody) methodBody);
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java Tue Oct 20 16:23:01 2009
@@ -80,6 +80,11 @@
_basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
}
+
+ public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ {
+ return new ContentBody(ByteBuffer.wrap(buf));
+ }
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
{
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Tue Oct 20 16:23:01 2009
@@ -493,13 +493,28 @@
}
}
- final private boolean isFull(int id)
+ protected boolean isFull(int id)
{
- return id - maxComplete >= commands.length || commandBytes >= byteLimit;
+ return isCommandsFull(id) || isBytesFull();
+ }
+
+ protected boolean isBytesFull()
+ {
+ return commandBytes >= byteLimit;
+ }
+
+ protected boolean isCommandsFull(int id)
+ {
+ return id - maxComplete >= commands.length;
}
public void invoke(Method m)
{
+ invoke(m,(Runnable)null);
+ }
+
+ public void invoke(Method m, Runnable postIdSettingAction)
+ {
if (m.getEncodedTrack() == Frame.L4)
{
if (m.hasPayload())
@@ -563,6 +578,10 @@
int next;
next = commandsOut++;
m.setId(next);
+ if(postIdSettingAction != null)
+ {
+ postIdSettingAction.run();
+ }
if (isFull(next))
{
@@ -625,6 +644,7 @@
m.setSync(true);
}
needSync = !m.isSync();
+
try
{
send(m);
@@ -649,7 +669,7 @@
// flush every 64K commands to avoid ambiguity on
// wraparound
- if ((next % 65536) == 0)
+ if (shouldIssueFlush(next))
{
try
{
@@ -677,6 +697,11 @@
}
}
+ protected boolean shouldIssueFlush(int next)
+ {
+ return (next % 65536) == 0;
+ }
+
public void sync()
{
sync(timeout);
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java Tue Oct 20 16:23:01 2009
@@ -229,7 +229,7 @@
if (_socketConnector instanceof SocketConnector)
{
((SocketConnector) _socketConnector).setWorkerTimeout(0);
- }
+ }
ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
future.join();
@@ -279,7 +279,10 @@
public void send(ByteBuffer msg)
{
- _lastWriteFuture = _ioSession.write(org.apache.mina.common.ByteBuffer.wrap(msg));
+ org.apache.mina.common.ByteBuffer minaBuf = org.apache.mina.common.ByteBuffer.allocate(msg.capacity());
+ minaBuf.put(msg);
+ minaBuf.flip();
+ _lastWriteFuture = _ioSession.write(minaBuf);
}
public void setIdleTimeout(long l)
Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java Tue Oct 20 16:23:01 2009
@@ -113,12 +113,12 @@
for (int loops = 0; (msg < MSG_COUNT) || (loops < MAX_LOOPS); loops++)
{
- if (_consumer1.receive(100) != null)
+ if (_consumer1.receive(1000) != null)
{
msg++;
}
- if (_consumer2.receive(100) != null)
+ if (_consumer2.receive(1000) != null)
{
msg++;
}
Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -60,6 +60,11 @@
public void setQueue(AMQQueue queue, boolean exclusive)
{
+
+ }
+
+ public void setNoLocal(boolean noLocal)
+ {
}
@@ -117,7 +122,7 @@
{
//To change body of implemented methods use File | Settings | File Templates.
}
-
+
public State getState()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -142,7 +147,7 @@
{
return null;
}
-
+
public void start()
{
//no-op
@@ -168,6 +173,16 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void set(String key, Object value)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Object get(String key)
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public LogActor getLogActor()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java Tue Oct 20 16:23:01 2009
@@ -25,13 +25,10 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.logging.LogSubject;
import java.util.HashMap;
import java.util.Iterator;
@@ -49,14 +46,21 @@
private static final String POST = "post";
private String DEFAULT_DELAY = "default";
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ // ***** MessageStore Interface.
+
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
{
- _logger.info("Starting SlowMessageStore on Virtualhost:" + virtualHost.getName());
- Configuration delays = config.getStoreConfiguration().subset(DELAYS);
+ //To change body of implemented methods use File | Settings | File Templates.
+
+ _logger.info("Starting SlowMessageStore on Virtualhost:" + name);
+ Configuration delays = config.subset(DELAYS);
configureDelays(delays);
- String messageStoreClass = config.getStoreConfiguration().getString("realStore");
+ String messageStoreClass = config.getString("realStore");
if (delays.containsKey(DEFAULT_DELAY))
{
@@ -75,11 +79,11 @@
" does not.");
}
_realStore = (MessageStore) o;
- _realStore.configure(virtualHost, base + ".store", config);
+ _realStore.configureConfigStore(name, recoveryHandler, config, logSubject);
}
else
{
- _realStore.configure(virtualHost, base + ".store", config);
+ _realStore.configureConfigStore(name, recoveryHandler, config, logSubject);
}
}
@@ -135,7 +139,7 @@
}
long slept = (System.nanoTime() - start) / 1000000;
-
+
if (slept >= delay)
{
_logger.info("Done sleep for:" + slept+":"+delay);
@@ -148,7 +152,14 @@
}
}
- // ***** MessageStore Interface.
+
+ public void configureMessageStore(String name,
+ MessageStoreRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
+ {
+ _realStore.configureMessageStore(name, recoveryHandler, config, logSubject);
+ }
public void close() throws Exception
{
@@ -157,13 +168,12 @@
doPostDelay("close");
}
- public void removeMessage(Long messageId) throws AMQException
+ public <M extends StorableMessageMetaData> StoredMessage<M> addMessage(M metaData)
{
- doPreDelay("removeMessage");
- _realStore.removeMessage(messageId);
- doPostDelay("removeMessage");
+ return _realStore.addMessage(metaData);
}
+
public void createExchange(Exchange exchange) throws AMQException
{
doPreDelay("createExchange");
@@ -211,127 +221,93 @@
doPostDelay("removeQueue");
}
- public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
+ public void configureTransactionLog(String name,
+ TransactionLogRecoveryHandler recoveryHandler,
+ Configuration storeConfiguration, LogSubject logSubject)
+ throws Exception
{
- doPreDelay("enqueueMessage");
- _realStore.enqueueMessage(context, queue, messageId);
- doPostDelay("enqueueMessage");
+ _realStore.configureTransactionLog(name, recoveryHandler, storeConfiguration, logSubject);
}
- public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
- {
- doPreDelay("dequeueMessage");
- _realStore.dequeueMessage(context, queue, messageId);
- doPostDelay("dequeueMessage");
- }
-
- public void beginTran(StoreContext context) throws AMQException
+ public Transaction newTransaction()
{
doPreDelay("beginTran");
- _realStore.beginTran(context);
+ Transaction txn = new SlowTransaction(_realStore.newTransaction());
doPostDelay("beginTran");
+ return txn;
}
- public void commitTran(StoreContext context) throws AMQException
- {
- doPreDelay("commitTran");
- _realStore.commitTran(context);
- doPostDelay("commitTran");
- }
- public StoreFuture commitTranAsync(StoreContext context) throws AMQException
+ public boolean isPersistent()
{
- commitTran(context);
- return new StoreFuture()
- {
- public boolean isComplete()
- {
- return true;
- }
-
- public void waitForCompletion()
- {
-
- }
- };
-
+ return _realStore.isPersistent();
}
- public void abortTran(StoreContext context) throws AMQException
+ public void storeMessageHeader(Long messageNumber, ServerMessage message)
{
- doPreDelay("abortTran");
- _realStore.abortTran(context);
- doPostDelay("abortTran");
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public boolean inTran(StoreContext context)
+ public void storeContent(Long messageNumber, long offset, ByteBuffer body)
{
- doPreDelay("inTran");
- boolean b = _realStore.inTran(context);
- doPostDelay("inTran");
- return b;
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public Long getNewMessageId()
+ public ServerMessage getMessage(Long messageNumber)
{
- doPreDelay("getNewMessageId");
- Long l = _realStore.getNewMessageId();
- doPostDelay("getNewMessageId");
- return l;
+ return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public void storeContentBodyChunk(
- Long messageId,
- int index,
- ContentChunk contentBody,
- boolean lastContentBody) throws AMQException
+ private class SlowTransaction implements Transaction
{
- doPreDelay("storeContentBodyChunk");
- _realStore.storeContentBodyChunk(messageId, index, contentBody, lastContentBody);
- doPostDelay("storeContentBodyChunk");
- }
+ private final Transaction _underlying;
- public void storeMessageMetaData(Long messageId, MessageMetaData messageMetaData) throws AMQException
- {
- doPreDelay("storeMessageMetaData");
- _realStore.storeMessageMetaData(messageId, messageMetaData);
- doPostDelay("storeMessageMetaData");
- }
+ private SlowTransaction(Transaction underlying)
+ {
+ _underlying = underlying;
+ }
- public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
- {
- doPreDelay("getMessageMetaData");
- MessageMetaData mmd = _realStore.getMessageMetaData(messageId);
- doPostDelay("getMessageMetaData");
- return mmd;
- }
+ public void enqueueMessage(TransactionLogResource queue, Long messageId)
+ throws AMQException
+ {
+ doPreDelay("enqueueMessage");
+ _underlying.enqueueMessage(queue, messageId);
+ doPostDelay("enqueueMessage");
+ }
- public ContentChunk getContentBodyChunk(Long messageId, int index) throws AMQException
- {
- doPreDelay("getContentBodyChunk");
- ContentChunk c = _realStore.getContentBodyChunk(messageId, index);
- doPostDelay("getContentBodyChunk");
- return c;
- }
+ public void dequeueMessage(TransactionLogResource queue, Long messageId)
+ throws AMQException
+ {
+ doPreDelay("dequeueMessage");
+ _underlying.dequeueMessage(queue, messageId);
+ doPostDelay("dequeueMessage");
+ }
- public boolean isPersistent()
- {
- return _realStore.isPersistent();
- }
+ public void commitTran()
+ throws AMQException
+ {
+ doPreDelay("commitTran");
+ _underlying.commitTran();
+ doPostDelay("commitTran");
+ }
- public void storeMessageHeader(Long messageNumber, ServerMessage message)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
+ public StoreFuture commitTranAsync()
+ throws AMQException
+ {
+ doPreDelay("commitTran");
+ StoreFuture future = _underlying.commitTranAsync();
+ doPostDelay("commitTran");
+ return future;
+ }
- public void storeContent(Long messageNumber, long offset, ByteBuffer body)
- {
- //To change body of implemented methods use File | Settings | File Templates.
+ public void abortTran()
+ throws AMQException
+ {
+ doPreDelay("abortTran");
+ _underlying.abortTran();
+ doPostDelay("abortTran");
+ }
}
- public ServerMessage getMessage(Long messageNumber)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
}
Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java Tue Oct 20 16:23:01 2009
@@ -81,9 +81,14 @@
assertTrue(e.hasMoreElements());
+ int i = 0;
while (e.hasMoreElements())
{
e.nextElement();
+ if(++i > 1)
+ {
+ fail("Two many elemnts to browse!");
+ }
}
browser.close();
Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java Tue Oct 20 16:23:01 2009
@@ -79,7 +79,7 @@
* This test sends x messages and receives them with an async consumer.
* Waits for all messages to be received or for 60 s
* and checks whether the queue is empty.
- *
+ *
* @throws Exception
*/
public void testDupsOK() throws Exception
@@ -93,7 +93,7 @@
assertEquals("The queue should have msgs at start", MSG_COUNT, ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue));
- clientConnection.start();
+ clientConnection.start();
consumer.setMessageListener(new MessageListener()
{
@@ -110,7 +110,7 @@
if (message instanceof TextMessage)
{
try
- {
+ {
if (message.getIntProperty("count") == MSG_COUNT)
{
try
@@ -156,7 +156,11 @@
// before the dispatcher has sent the ack back to the broker.
consumer.close();
- assertEquals("The queue should have 0 msgs left", 0, ((AMQSession) clientSession).getQueueDepth((AMQDestination) _queue));
+ clientSession.close();
+
+ final Session clientSession2 = clientConnection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ assertEquals("The queue should have 0 msgs left", 0, ((AMQSession) clientSession2).getQueueDepth((AMQDestination) _queue));
clientConnection.close();
}
Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java Tue Oct 20 16:23:01 2009
@@ -79,16 +79,22 @@
}
// Test boundary of 1 packet to 2 packets
- public void test64kminus1()
+ public void test64kminus9()
{
- checkLargeMessage((64 * 1024) - 1);
+ checkLargeMessage((64 * 1024) - 9);
}
- public void test64k()
+ public void test64kminus8()
{
- checkLargeMessage(64 * 1024);
+ checkLargeMessage((64 * 1024)-8);
}
+ public void test64kminus7()
+ {
+ checkLargeMessage((64 * 1024)-7);
+ }
+
+
public void test64kplus1()
{
checkLargeMessage((64 * 1024) + 1);
Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Tue Oct 20 16:23:01 2009
@@ -50,7 +50,8 @@
protected final String queue = "direct://amq.direct//message-requeue-test-queue";
protected String payload = "Message:";
- protected final String BROKER = "vm://:1";
+ //protected final String BROKER = "vm://:1";
+ protected final String BROKER = "tcp://127.0.0.1:5672";
private boolean testReception = true;
private long[] receieved = new long[numTestMessages + 1];
Modified: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java Tue Oct 20 16:23:01 2009
@@ -34,7 +34,7 @@
* create and register a durable subscriber then close it
* create a publisher and send a persistant message followed by a non persistant message
* crash and restart the broker
- * recreate the durable subscriber and check that only the first message is received
+ * recreate the durable subscriber and check that only the first message is received white st
*/
public void testDurSubRestoredAfterNonPersistentMessageSent() throws Exception
{
Added: qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.0.10.testprofile
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.0.10.testprofile?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.0.10.testprofile (added)
+++ qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.0.10.testprofile Tue Oct 20 16:23:01 2009
@@ -0,0 +1,8 @@
+broker.language=java
+broker.version=0-10
+broker=${project.root}/build/bin/qpid-server -p @PORT -m @MPORT -c @CONFIG_FILE -l ${test.profiles}/log4j-test.xml
+broker.clean=${test.profiles}/clean-dir ${build.data} ${project.root}/build/work/derbyDB
+broker.ready=BRK-1004
+broker.stopped=Exception
+
+profile.excludes=08TransientExcludes 08StandaloneExcludes 010Excludes 010TransientExcludes
Propchange: qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.0.10.testprofile
------------------------------------------------------------------------------
svn:executable = *
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org