You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/06 00:16:52 UTC

svn commit: r493231 [2/2] - in /incubator/qpid/branches/new_persistence/java: broker/etc/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/j...

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Jan  5 15:16:50 2007
@@ -27,6 +27,7 @@
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.store.StoreContext;
 
 import javax.management.JMException;
 import java.util.LinkedList;
@@ -41,9 +42,11 @@
     private AMQQueueMBean _queueMBean;
     private QueueRegistry _queueRegistry;
     private MessageStore _messageStore = new SkeletonMessageStore();
-    private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, null,
+    private StoreContext _storeContext = new StoreContext();
+    private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+                                                                                     null,
                                                                                      new LinkedList<RequiredDeliveryException>(),
-                                                                                     new HashSet<Long>()); 
+                                                                                     new HashSet<Long>());
     private MockProtocolSession _protocolSession;
     private AMQChannel _channel;
 
@@ -140,8 +143,8 @@
 
         AMQMessage msg = message(false);
         long id = msg.getMessageId();
-        _queue.clearQueue();
-        _queue.process(msg);
+        _queue.clearQueue(_storeContext);
+        _queue.process(_storeContext, msg);
         _queueMBean.viewMessageContent(id);
         try
         {
@@ -161,7 +164,7 @@
         BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
         publish.immediate = immediate;
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
-        contentHeaderBody.bodySize = 1000;   // in bytes       
+        contentHeaderBody.bodySize = 1000;   // in bytes
         return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
     }
 
@@ -184,7 +187,7 @@
         }
         for (int i = 0; i < messageCount; i++)
         {
-            _queue.process(messages[i]);
+            _queue.process(_storeContext, messages[i]);
         }
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java Fri Jan  5 15:16:50 2007
@@ -32,6 +32,7 @@
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.util.TestApplicationRegistry;
@@ -53,6 +54,8 @@
 
     private TestableMemoryMessageStore _messageStore;
 
+    private StoreContext _storeContext = new StoreContext();
+
     private AMQChannel _channel;
 
     private SubscriptionSet _subscriptionManager;
@@ -82,7 +85,7 @@
 
     private void publishMessages(int count, boolean persistent) throws AMQException
     {
-        TransactionalContext txnContext = new NonTransactionalContext(_messageStore, null,
+        TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
                                                                       new LinkedList<RequiredDeliveryException>(),
                                                                       new HashSet<Long>());
         MessageHandleFactory factory = new MessageHandleFactory();
@@ -111,7 +114,7 @@
             // the reference is normally incremented. The test is easier to construct if we have direct access to the
             // subscription
             msg.incrementReference();
-            msg.routingComplete(_messageStore, factory);
+            msg.routingComplete(_messageStore, _storeContext, factory);
             // we manually send the message to the subscription
             _subscription.send(msg, _queue);
         }

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java Fri Jan  5 15:16:50 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -186,7 +186,7 @@
             AMQMessage msg = nextMessage();
             if (msg != null)
             {
-                _deliveryMgr.deliver(toString(), msg);
+                _deliveryMgr.deliver(null, toString(), msg);
             }
         }
     }

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java Fri Jan  5 15:16:50 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
+import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.AMQException;
 
 import junit.framework.TestSuite;
@@ -29,6 +30,7 @@
 {
     protected final SubscriptionSet _subscriptions = new SubscriptionSet();
     protected DeliveryManager _mgr;
+    protected StoreContext _storeContext = new StoreContext();
 
     public DeliveryManagerTest() throws Exception
     {
@@ -45,7 +47,7 @@
 
         for (int i = 0; i < batch; i++)
         {
-            _mgr.deliver("Me", messages[i]);
+            _mgr.deliver(_storeContext, "Me", messages[i]);
         }
 
         SubscriptionTestHelper s1 = new SubscriptionTestHelper("1");
@@ -55,7 +57,7 @@
 
         for (int i = batch; i < messages.length; i++)
         {
-            _mgr.deliver("Me", messages[i]);
+            _mgr.deliver(_storeContext, "Me", messages[i]);
         }
 
         assertTrue(s1.getMessages().isEmpty());
@@ -93,7 +95,7 @@
 
         for (int i = 0; i < batch; i++)
         {
-            _mgr.deliver("Me", messages[i]);
+            _mgr.deliver(_storeContext, "Me", messages[i]);
         }
 
         assertEquals(batch, s1.getMessages().size());
@@ -107,7 +109,7 @@
         s1.setSuspended(true);
         for (int i = batch; i < messages.length; i++)
         {
-            _mgr.deliver("Me", messages[i]);
+            _mgr.deliver(_storeContext, "Me", messages[i]);
         }
 
         _mgr.processAsync(new OnCurrentThreadExecutor());
@@ -129,7 +131,7 @@
         try
         {
             AMQMessage msg = message(true);
-            _mgr.deliver("Me", msg);
+            _mgr.deliver(_storeContext, "Me", msg);
             msg.checkDeliveredToConsumer();
             fail("expected exception did not occur");
         }
@@ -151,7 +153,7 @@
             _subscriptions.addSubscriber(s);
             s.setSuspended(true);
             AMQMessage msg = message(true);
-            _mgr.deliver("Me", msg);
+            _mgr.deliver(_storeContext, "Me", msg);
             msg.checkDeliveredToConsumer();
             fail("expected exception did not occur");
         }

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java Fri Jan  5 15:16:50 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.util.TestApplicationRegistry;
 import org.apache.qpid.server.txn.TransactionalContext;
@@ -40,7 +41,9 @@
 {
     private final MessageStore _messageStore = new SkeletonMessageStore();
 
-    private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, null,
+    private final StoreContext _storeContext = new StoreContext();
+
+    private final TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
                                                                                  new LinkedList<RequiredDeliveryException>(),
                                                                                  new HashSet<Long>());
 
@@ -61,7 +64,7 @@
         BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
         publish.immediate = immediate;
         return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
-                              new ContentHeaderBody());        
+                              new ContentHeaderBody());
     }
 
 }

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java Fri Jan  5 15:16:50 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,9 +48,9 @@
 
     public void close() throws Exception
     {
-    }    
+    }
 
-    public void removeMessage(long messageId)
+    public void removeMessage(StoreContext s, long messageId)
     {
     }
 
@@ -62,28 +62,28 @@
     {
     }
 
-    public void enqueueMessage(String name, long messageId) throws AMQException
+    public void enqueueMessage(StoreContext s, String name, long messageId) throws AMQException
     {
     }
 
-    public void dequeueMessage(String name, long messageId) throws AMQException
+    public void dequeueMessage(StoreContext s, String name, long messageId) throws AMQException
     {
     }
 
-    public void beginTran() throws AMQException
+    public void beginTran(StoreContext s) throws AMQException
     {
     }
 
-    public boolean inTran()
+    public boolean inTran(StoreContext sc)
     {
         return false;
     }
-    
-    public void commitTran() throws AMQException
+
+    public void commitTran(StoreContext storeContext) throws AMQException
     {
     }
 
-    public void abortTran() throws AMQException
+    public void abortTran(StoreContext storeContext) throws AMQException
     {
     }
 
@@ -97,12 +97,12 @@
         return _messageId.getAndIncrement();
     }
 
-    public void storeContentBodyChunk(long messageId, int index, ContentBody contentBody) throws AMQException
+    public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody) throws AMQException
     {
 
     }
 
-    public void storeMessageMetaData(long messageId, MessageMetaData messageMetaData) throws AMQException
+    public void storeMessageMetaData(StoreContext sc, long messageId, MessageMetaData messageMetaData) throws AMQException
     {
 
     }

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Fri Jan  5 15:16:50 2007
@@ -22,9 +22,9 @@
 
 import junit.framework.TestCase;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.MessageHandleFactory;
 import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -36,6 +36,8 @@
 {
     private TestableMemoryMessageStore _store;
 
+    private StoreContext _storeContext = new StoreContext();
+
     protected void setUp() throws Exception
     {
         super.setUp();
@@ -48,14 +50,16 @@
     public void testMessageGetsRemoved() throws AMQException
     {
         createPersistentContentHeader();
-        AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(),
-                                            new NonTransactionalContext(_store, null, null, null),
+        // TODO: fix hardcoded protocol version data
+        AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
+                                                                                           (byte)0),
+                                            new NonTransactionalContext(_store, _storeContext, null, null, null),
                                             createPersistentContentHeader());
         message.incrementReference();
         // we call routing complete to set up the handle
-        message.routingComplete(_store, new MessageHandleFactory());
+        message.routingComplete(_store, _storeContext, new MessageHandleFactory());
         assertTrue(_store.getMessageMetaDataMap().size() == 1);
-        message.decrementReference();
+        message.decrementReference(_storeContext);
         assertTrue(_store.getMessageMetaDataMap().size() == 0);
     }
 
@@ -70,15 +74,17 @@
 
     public void testMessageRemains() throws AMQException
     {
-        AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody(),
-                                            new NonTransactionalContext(_store, null, null, null),
+        // TODO: fix hardcoded protocol version data
+        AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
+                                                                                           (byte)0),
+                                            new NonTransactionalContext(_store, _storeContext, null, null, null),
                                             createPersistentContentHeader());
         message.incrementReference();
         // we call routing complete to set up the handle
-        message.routingComplete(_store, new MessageHandleFactory());
+        message.routingComplete(_store, _storeContext, new MessageHandleFactory());
         assertTrue(_store.getMessageMetaDataMap().size() == 1);
         message.incrementReference();
-        message.decrementReference();
+        message.decrementReference(_storeContext);
         assertTrue(_store.getMessageMetaDataMap().size() == 1);
     }
 

Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java?view=diff&rev=493231&r1=493230&r2=493231
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java Fri Jan  5 15:16:50 2007
@@ -24,6 +24,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.StoreContext;
 
 import java.util.LinkedList;
 
@@ -43,7 +44,7 @@
         buffer.enlist(op);
         buffer.enlist(new MockOp().expectPrepare().expectCommit());
 
-        buffer.commit();
+        buffer.commit(null);
 
         validateOps();
         store.validate();
@@ -58,7 +59,7 @@
         buffer.enlist(new MockOp().expectRollback());
         buffer.enlist(new MockOp().expectRollback());
 
-        buffer.rollback();
+        buffer.rollback(null);
 
         validateOps();
         store.validate();
@@ -77,7 +78,7 @@
         buffer.enlist(new FailedPrepare());
         buffer.enlist(new MockOp());
 
-        buffer.commit();
+        buffer.commit(null);
         validateOps();
         store.validate();
     }
@@ -95,7 +96,7 @@
         buffer.enlist(new StoreMessageOperation(store));
         buffer.enlist(new TxnTester(store));
 
-        buffer.commit();
+        buffer.commit(null);
         validateOps();
         store.validate();
     }
@@ -127,12 +128,12 @@
             ops.add(this);
         }
 
-        public void prepare()
+        public void prepare(StoreContext context)
         {
             assertEquals(expected.removeLast(), PREPARE);
         }
 
-        public void commit()
+        public void commit(StoreContext context)
         {
             assertEquals(expected.removeLast(), COMMIT);
         }
@@ -142,7 +143,7 @@
             assertEquals(expected.removeLast(), UNDO_PREPARE);
         }
 
-        public void rollback()
+        public void rollback(StoreContext context)
         {
             assertEquals(expected.removeLast(), ROLLBACK);
         }
@@ -249,16 +250,16 @@
 
     class NullOp implements TxnOp
     {
-        public void prepare() throws AMQException
+        public void prepare(StoreContext context) throws AMQException
         {
         }
-        public void commit()
+        public void commit(StoreContext context)
         {
         }
         public void undoPrepare()
         {
         }
-        public void rollback()
+        public void rollback(StoreContext context)
         {
         }
     }
@@ -275,6 +276,8 @@
     {
         private final MessageStore store;
 
+        private final StoreContext context = new StoreContext();
+
         TxnTester(MessageStore store)
         {
             this.store = store;
@@ -282,12 +285,12 @@
 
         public void prepare() throws AMQException
         {
-            assertTrue("Expected prepare to be performed under txn", store.inTran());
+            assertTrue("Expected prepare to be performed under txn", store.inTran(context));
         }
 
         public void commit()
         {
-            assertTrue("Expected commit not to be performed under txn", !store.inTran());
+            assertTrue("Expected commit not to be performed under txn", !store.inTran(context));
         }
     }