You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/06 00:16:52 UTC
svn commit: r493231 [2/2] - in
/incubator/qpid/branches/new_persistence/java: broker/etc/
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/j...
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Jan 5 15:16:50 2007
@@ -27,6 +27,7 @@
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.store.StoreContext;
import javax.management.JMException;
import java.util.LinkedList;
@@ -41,9 +42,11 @@
private AMQQueueMBean _queueMBean;
private QueueRegistry _queueRegistry;
private MessageStore _messageStore = new SkeletonMessageStore();
- private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, null,
+ private StoreContext _storeContext = new StoreContext();
+ private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+ null,
new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
+ new HashSet<Long>());
private MockProtocolSession _protocolSession;
private AMQChannel _channel;
@@ -140,8 +143,8 @@
AMQMessage msg = message(false);
long id = msg.getMessageId();
- _queue.clearQueue();
- _queue.process(msg);
+ _queue.clearQueue(_storeContext);
+ _queue.process(_storeContext, msg);
_queueMBean.viewMessageContent(id);
try
{
@@ -161,7 +164,7 @@
BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
publish.immediate = immediate;
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.bodySize = 1000; // in bytes
+ contentHeaderBody.bodySize = 1000; // in bytes
return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
}
@@ -184,7 +187,7 @@
}
for (int i = 0; i < messageCount; i++)
{
- _queue.process(messages[i]);
+ _queue.process(_storeContext, messages[i]);
}
}
}
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java Fri Jan 5 15:16:50 2007
@@ -32,6 +32,7 @@
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.TestApplicationRegistry;
@@ -53,6 +54,8 @@
private TestableMemoryMessageStore _messageStore;
+ private StoreContext _storeContext = new StoreContext();
+
private AMQChannel _channel;
private SubscriptionSet _subscriptionManager;
@@ -82,7 +85,7 @@
private void publishMessages(int count, boolean persistent) throws AMQException
{
- TransactionalContext txnContext = new NonTransactionalContext(_messageStore, null,
+ TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
new LinkedList<RequiredDeliveryException>(),
new HashSet<Long>());
MessageHandleFactory factory = new MessageHandleFactory();
@@ -111,7 +114,7 @@
// the reference is normally incremented. The test is easier to construct if we have direct access to the
// subscription
msg.incrementReference();
- msg.routingComplete(_messageStore, factory);
+ msg.routingComplete(_messageStore, _storeContext, factory);
// we manually send the message to the subscription
_subscription.send(msg, _queue);
}
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java Fri Jan 5 15:16:50 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -186,7 +186,7 @@
AMQMessage msg = nextMessage();
if (msg != null)
{
- _deliveryMgr.deliver(toString(), msg);
+ _deliveryMgr.deliver(null, toString(), msg);
}
}
}
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java Fri Jan 5 15:16:50 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
import junit.framework.TestSuite;
@@ -29,6 +30,7 @@
{
protected final SubscriptionSet _subscriptions = new SubscriptionSet();
protected DeliveryManager _mgr;
+ protected StoreContext _storeContext = new StoreContext();
public DeliveryManagerTest() throws Exception
{
@@ -45,7 +47,7 @@
for (int i = 0; i < batch; i++)
{
- _mgr.deliver("Me", messages[i]);
+ _mgr.deliver(_storeContext, "Me", messages[i]);
}
SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
@@ -55,7 +57,7 @@
for (int i = batch; i < messages.length; i++)
{
- _mgr.deliver("Me", messages[i]);
+ _mgr.deliver(_storeContext, "Me", messages[i]);
}
assertTrue(s1.getMessages().isEmpty());
@@ -93,7 +95,7 @@
for (int i = 0; i < batch; i++)
{
- _mgr.deliver("Me", messages[i]);
+ _mgr.deliver(_storeContext, "Me", messages[i]);
}
assertEquals(batch, s1.getMessages().size());
@@ -107,7 +109,7 @@
s1.setSuspended(true);
for (int i = batch; i < messages.length; i++)
{
- _mgr.deliver("Me", messages[i]);
+ _mgr.deliver(_storeContext, "Me", messages[i]);
}
_mgr.processAsync(new OnCurrentThreadExecutor());
@@ -129,7 +131,7 @@
try
{
AMQMessage msg = message(true);
- _mgr.deliver("Me", msg);
+ _mgr.deliver(_storeContext, "Me", msg);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
}
@@ -151,7 +153,7 @@
_subscriptions.addSubscriber(s);
s.setSuspended(true);
AMQMessage msg = message(true);
- _mgr.deliver("Me", msg);
+ _mgr.deliver(_storeContext, "Me", msg);
msg.checkDeliveredToConsumer();
fail("expected exception did not occur");
}
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java Fri Jan 5 15:16:50 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -40,7 +41,9 @@
{
private final MessageStore _messageStore = new SkeletonMessageStore();
- private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, null,
+ private final StoreContext _storeContext = new StoreContext();
+
+ private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
new LinkedList<RequiredDeliveryException>(),
new HashSet<Long>());
@@ -61,7 +64,7 @@
BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
publish.immediate = immediate;
return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
- new ContentHeaderBody());
+ new ContentHeaderBody());
}
}
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java Fri Jan 5 15:16:50 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,9 +48,9 @@
public void close() throws Exception
{
- }
+ }
- public void removeMessage(long messageId)
+ public void removeMessage(StoreContext s, long messageId)
{
}
@@ -62,28 +62,28 @@
{
}
- public void enqueueMessage(String name, long messageId) throws AMQException
+ public void enqueueMessage(StoreContext s, String name, long messageId) throws AMQException
{
}
- public void dequeueMessage(String name, long messageId) throws AMQException
+ public void dequeueMessage(StoreContext s, String name, long messageId) throws AMQException
{
}
- public void beginTran() throws AMQException
+ public void beginTran(StoreContext s) throws AMQException
{
}
- public boolean inTran()
+ public boolean inTran(StoreContext sc)
{
return false;
}
-
- public void commitTran() throws AMQException
+
+ public void commitTran(StoreContext storeContext) throws AMQException
{
}
- public void abortTran() throws AMQException
+ public void abortTran(StoreContext storeContext) throws AMQException
{
}
@@ -97,12 +97,12 @@
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException
+ public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody) throws AMQException
{
}
- public void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException
+ public void storeMessageMetaData(StoreContext sc, long messageId, MessageMetaData messageMetaData) throws AMQException
{
}
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Fri Jan 5 15:16:50 2007
@@ -22,9 +22,9 @@
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -36,6 +36,8 @@
{
private TestableMemoryMessageStore _store;
+ private StoreContext _storeContext = new StoreContext();
+
protected void setUp() throws Exception
{
super.setUp();
@@ -48,14 +50,16 @@
public void testMessageGetsRemoved() throws AMQException
{
createPersistentContentHeader();
- AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(),
- new NonTransactionalContext(_store, null, null, null),
+ // TODO: fix hardcoded protocol version data
+ AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
+ (byte)0),
+ new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
message.incrementReference();
// we call routing complete to set up the handle
- message.routingComplete(_store, new MessageHandleFactory());
+ message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertTrue(_store.getMessageMetaDataMap().size() == 1);
- message.decrementReference();
+ message.decrementReference(_storeContext);
assertTrue(_store.getMessageMetaDataMap().size() == 0);
}
@@ -70,15 +74,17 @@
public void testMessageRemains() throws AMQException
{
- AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(),
- new NonTransactionalContext(_store, null, null, null),
+ // TODO: fix hardcoded protocol version data
+ AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
+ (byte)0),
+ new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
message.incrementReference();
// we call routing complete to set up the handle
- message.routingComplete(_store, new MessageHandleFactory());
+ message.routingComplete(_store, _storeContext, new MessageHandleFactory());
assertTrue(_store.getMessageMetaDataMap().size() == 1);
message.incrementReference();
- message.decrementReference();
+ message.decrementReference(_storeContext);
assertTrue(_store.getMessageMetaDataMap().size() == 1);
}
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java Fri Jan 5 15:16:50 2007
@@ -24,6 +24,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
import java.util.LinkedList;
@@ -43,7 +44,7 @@
buffer.enlist(op);
buffer.enlist(new MockOp().expectPrepare().expectCommit());
- buffer.commit();
+ buffer.commit(null);
validateOps();
store.validate();
@@ -58,7 +59,7 @@
buffer.enlist(new MockOp().expectRollback());
buffer.enlist(new MockOp().expectRollback());
- buffer.rollback();
+ buffer.rollback(null);
validateOps();
store.validate();
@@ -77,7 +78,7 @@
buffer.enlist(new FailedPrepare());
buffer.enlist(new MockOp());
- buffer.commit();
+ buffer.commit(null);
validateOps();
store.validate();
}
@@ -95,7 +96,7 @@
buffer.enlist(new StoreMessageOperation(store));
buffer.enlist(new TxnTester(store));
- buffer.commit();
+ buffer.commit(null);
validateOps();
store.validate();
}
@@ -127,12 +128,12 @@
ops.add(this);
}
- public void prepare()
+ public void prepare(StoreContext context)
{
assertEquals(expected.removeLast(), PREPARE);
}
- public void commit()
+ public void commit(StoreContext context)
{
assertEquals(expected.removeLast(), COMMIT);
}
@@ -142,7 +143,7 @@
assertEquals(expected.removeLast(), UNDO_PREPARE);
}
- public void rollback()
+ public void rollback(StoreContext context)
{
assertEquals(expected.removeLast(), ROLLBACK);
}
@@ -249,16 +250,16 @@
class NullOp implements TxnOp
{
- public void prepare() throws AMQException
+ public void prepare(StoreContext context) throws AMQException
{
}
- public void commit()
+ public void commit(StoreContext context)
{
}
public void undoPrepare()
{
}
- public void rollback()
+ public void rollback(StoreContext context)
{
}
}
@@ -275,6 +276,8 @@
{
private final MessageStore store;
+ private final StoreContext context = new StoreContext();
+
TxnTester(MessageStore store)
{
this.store = store;
@@ -282,12 +285,12 @@
public void prepare() throws AMQException
{
- assertTrue("Expected prepare to be performed under txn", store.inTran());
+ assertTrue("Expected prepare to be performed under txn", store.inTran(context));
}
public void commit()
{
- assertTrue("Expected commit not to be performed under txn", !store.inTran());
+ assertTrue("Expected commit not to be performed under txn", !store.inTran(context));
}
}