You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/20 18:23:11 UTC
svn commit: r827724 [7/8] - in /qpid/branches/java-broker-0-10/qpid/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/src/main/java/org/apach...
Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java&r1=824494&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Tue Oct 20 16:23:01 2009
@@ -21,14 +21,16 @@
package org.apache.qpid.server.virtualhost;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -45,16 +47,15 @@
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import javax.management.NotCompliantMBeanException;
import java.util.Collections;
@@ -63,9 +64,9 @@
import java.util.Timer;
import java.util.TimerTask;
-public class VirtualHost implements Accessable
+public class VirtualHostImpl implements Accessable, VirtualHost
{
- private static final Logger _logger = Logger.getLogger(VirtualHost.class);
+ private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class);
private final String _name;
@@ -133,9 +134,9 @@
return _name.toString();
}
- public VirtualHost getVirtualHost()
+ public VirtualHostImpl getVirtualHost()
{
- return VirtualHost.this;
+ return VirtualHostImpl.this;
}
} // End of MBean class
@@ -147,17 +148,17 @@
*
* @throws Exception
*/
- public VirtualHost(VirtualHostConfiguration hostConfig) throws Exception
+ public VirtualHostImpl(VirtualHostConfiguration hostConfig) throws Exception
{
this(hostConfig, null);
}
- public VirtualHost(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
+ public VirtualHostImpl(VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
{
_configuration = hostConfig;
_name = hostConfig.getName();
- CurrentActor.get().message(VirtualHostMessages.VHT_1001(_name));
+ CurrentActor.get().message(VirtualHostMessages.VHT_1001(_name));
if (_name == null || _name.length() == 0)
{
@@ -196,7 +197,7 @@
// Derby being one.
// todo this can be removed with the resolution fo QPID-2096
configFileRT.exchange.clear();
-
+
initialiseModel(hostConfig);
//todo REMOVE Work Around for QPID-2096
@@ -279,7 +280,7 @@
_houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
period / 2,
period);
-
+
class ForceChannelClosuresTask extends TimerTask
{
public void run()
@@ -289,7 +290,7 @@
}
}
}
-
+
private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
{
String messageStoreClass = hostConfig.getMessageStoreClass();
@@ -303,7 +304,24 @@
" does not.");
}
MessageStore messageStore = (MessageStore) o;
- messageStore.configure(this, "store", hostConfig);
+ VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
+
+ MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore);
+
+ messageStore.configureConfigStore(this.getName(),
+ recoveryHandler,
+ hostConfig.getStoreConfiguration(),
+ storeLogSubject);
+
+ messageStore.configureMessageStore(this.getName(),
+ recoveryHandler,
+ hostConfig.getStoreConfiguration(),
+ storeLogSubject);
+ messageStore.configureTransactionLog(this.getName(),
+ recoveryHandler,
+ hostConfig.getStoreConfiguration(),
+ storeLogSubject);
+
_messageStore = messageStore;
_durableConfigurationStore = messageStore;
}
@@ -450,13 +468,13 @@
{
queue.stop();
}
- }
+ }
//Stop Housekeeping
if (_houseKeepingTimer != null)
{
_houseKeepingTimer.cancel();
- }
+ }
//Close MessageStore
if (_messageStore != null)
@@ -503,6 +521,14 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void configureConfigStore(String name,
+ ConfigurationRecoveryHandler recoveryHandler,
+ Configuration config,
+ LogSubject logSubject) throws Exception
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void createExchange(Exchange exchange) throws AMQException
{
if (exchange.isDurable())
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java Tue Oct 20 16:23:01 2009
@@ -21,7 +21,6 @@
package org.apache.qpid.server.virtualhost;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import java.util.ArrayList;
import java.util.Collection;
@@ -31,7 +30,7 @@
public class VirtualHostRegistry
{
- private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
+ private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String, VirtualHost>();
private String _defaultVirtualHostName;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java Tue Oct 20 16:23:01 2009
@@ -14,14 +14,16 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.tools.messagestore.commands;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
public class Copy extends Move
{
@@ -49,7 +51,9 @@
protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue)
{
- fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getName().toString(), _storeContext);
+ ServerTransaction txn = new LocalTransaction(fromQueue.getVirtualHost().getTransactionLog());
+ fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getName().toString(), txn);
+ txn.commit();
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java Tue Oct 20 16:23:01 2009
@@ -21,9 +21,6 @@
package org.apache.qpid.tools.messagestore.commands;
import org.apache.commons.codec.binary.Hex;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.QueueEntryImpl;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.message.ServerMessage;
@@ -31,7 +28,6 @@
import org.apache.qpid.tools.utils.Console;
import java.io.UnsupportedEncodingException;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -137,114 +133,116 @@
hex.add(Console.ROW_DIVIDER);
ascii.add(Console.ROW_DIVIDER);
- if(msg instanceof AMQMessage)
+
+ final int messageSize = (int) msg.getSize();
+ if (messageSize != 0)
{
+ hex.add("Hex");
+ hex.add(Console.ROW_DIVIDER);
- Iterator bodies = ((AMQMessage)msg).getContentBodyIterator();
- if (bodies.hasNext())
- {
- hex.add("Hex");
- hex.add(Console.ROW_DIVIDER);
+ ascii.add("ASCII");
+ ascii.add(Console.ROW_DIVIDER);
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(64 * 1024);
- ascii.add("ASCII");
- ascii.add(Console.ROW_DIVIDER);
+ int position = 0;
- while (bodies.hasNext())
- {
- ContentChunk chunk = (ContentChunk) bodies.next();
+ while(position < messageSize)
+ {
+
+ position += msg.getContent(buf, position);
+ buf.flip();
+ //Duplicate so we don't destroy original data :)
+ java.nio.ByteBuffer hexBuffer = buf;
- //Duplicate so we don't destroy original data :)
- ByteBuffer hexBuffer = chunk.getData().duplicate();
+ java.nio.ByteBuffer charBuffer = hexBuffer.duplicate();
- ByteBuffer charBuffer = hexBuffer.duplicate();
+ Hex hexencoder = new Hex();
- Hex hexencoder = new Hex();
+ while (hexBuffer.hasRemaining())
+ {
+ byte[] line = new byte[LINE_SIZE];
- while (hexBuffer.hasRemaining())
+ int bufsize = hexBuffer.remaining();
+ if (bufsize < LINE_SIZE)
+ {
+ hexBuffer.get(line, 0, bufsize);
+ }
+ else
{
- byte[] line = new byte[LINE_SIZE];
+ bufsize = line.length;
+ hexBuffer.get(line);
+ }
- int bufsize = hexBuffer.remaining();
- if (bufsize < LINE_SIZE)
- {
- hexBuffer.get(line, 0, bufsize);
- }
- else
- {
- bufsize = line.length;
- hexBuffer.get(line);
- }
+ byte[] encoded = hexencoder.encode(line);
- byte[] encoded = hexencoder.encode(line);
+ try
+ {
+ String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING);
+ String hexLine = "";
- try
+ int strKength = encStr.length();
+ for (int c = 0; c < strKength; c++)
{
- String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING);
- String hexLine = "";
+ hexLine += encStr.charAt(c);
- int strKength = encStr.length();
- for (int c = 0; c < strKength; c++)
+ if (c % 2 == 1 && SPACE_BYTES)
{
- hexLine += encStr.charAt(c);
-
- if (c % 2 == 1 && SPACE_BYTES)
- {
- hexLine += BYTE_SPACER;
- }
+ hexLine += BYTE_SPACER;
}
-
- hex.add(hexLine);
}
- catch (UnsupportedEncodingException e)
- {
- _console.println(e.getMessage());
- return null;
- }
- }
- while (charBuffer.hasRemaining())
+ hex.add(hexLine);
+ }
+ catch (UnsupportedEncodingException e)
{
- String asciiLine = "";
+ _console.println(e.getMessage());
+ return null;
+ }
+ }
+
+ while (charBuffer.hasRemaining())
+ {
+ String asciiLine = "";
- for (int pos = 0; pos < LINE_SIZE; pos++)
+ for (int pos = 0; pos < LINE_SIZE; pos++)
+ {
+ if (charBuffer.hasRemaining())
{
- if (charBuffer.hasRemaining())
- {
- byte ch = charBuffer.get();
+ byte ch = charBuffer.get();
- if (isPrintable(ch))
- {
- asciiLine += (char) ch;
- }
- else
- {
- asciiLine += NON_PRINTING_ASCII_CHAR;
- }
-
- if (SPACE_BYTES)
- {
- asciiLine += BYTE_SPACER;
- }
+ if (isPrintable(ch))
+ {
+ asciiLine += (char) ch;
}
else
{
- break;
+ asciiLine += NON_PRINTING_ASCII_CHAR;
}
- }
- ascii.add(asciiLine);
+ if (SPACE_BYTES)
+ {
+ asciiLine += BYTE_SPACER;
+ }
+ }
+ else
+ {
+ break;
+ }
}
+
+ ascii.add(asciiLine);
}
+ buf.clear();
}
- else
- {
- List<String> result = new LinkedList<String>();
+ }
+ else
+ {
+ List<String> result = new LinkedList<String>();
- display.add(result);
- result.add("No ContentBodies");
- }
+ display.add(result);
+ result.add("No ContentBodies");
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java Tue Oct 20 16:23:01 2009
@@ -14,9 +14,9 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.tools.messagestore.commands;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java Tue Oct 20 16:23:01 2009
@@ -14,17 +14,17 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.tools.messagestore.commands;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.QueueEntryImpl;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import java.util.LinkedList;
@@ -33,12 +33,6 @@
public class Move extends AbstractCommand
{
- /**
- * Since the Coopy command is not associated with a real channel we can safely create our own store context
- * for use in the few methods that require one.
- */
- protected StoreContext _storeContext = new StoreContext();
-
public Move(MessageStoreTool tool)
{
super(tool);
@@ -201,6 +195,8 @@
protected void doCommand(AMQQueue fromQueue, long start, long id, AMQQueue toQueue)
{
- fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), _storeContext);
+ ServerTransaction txn = new LocalTransaction(fromQueue.getVirtualHost().getTransactionLog());
+ fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), txn);
+ txn.commit();
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Tue Oct 20 16:23:01 2009
@@ -25,7 +25,7 @@
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.message.ServerMessage;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java Tue Oct 20 16:23:01 2009
@@ -27,6 +27,7 @@
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQBrokerManagerMBeanTest extends TestCase
@@ -46,7 +47,7 @@
assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null);
- ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) _vHost.getManagedObject());
+ ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
mbean.createNewExchange(exchange1, "direct", false);
mbean.createNewExchange(exchange2, "topic", false);
mbean.createNewExchange(exchange3, "headers", false);
@@ -68,7 +69,7 @@
{
String queueName = "testQueue_" + System.currentTimeMillis();
- ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) _vHost.getManagedObject());
+ ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java Tue Oct 20 16:23:01 2009
@@ -27,7 +27,7 @@
import org.apache.qpid.server.queue.MockAMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntryIterator;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.MockSubscription;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java Tue Oct 20 16:23:01 2009
@@ -14,21 +14,20 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.qpid.server.configuration;
import junit.framework.TestCase;
-import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class VirtualHostConfigurationTest extends TestCase
{
@@ -55,50 +54,50 @@
super.tearDown();
}
-
+
public void testQueuePriority() throws Exception
{
// Set up queue with 5 priorities
- configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
+ configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
"atest");
- configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange",
+ configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange",
"amq.direct");
- configXml.addProperty("virtualhost.test.queues.queue.atest.priorities",
+ configXml.addProperty("virtualhost.test.queues.queue.atest.priorities",
"5");
// Set up queue with JMS style priorities
- configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
+ configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
"ptest");
- configXml.addProperty("virtualhost.test.queues.queue.ptest(-1).exchange",
+ configXml.addProperty("virtualhost.test.queues.queue.ptest(-1).exchange",
"amq.direct");
- configXml.addProperty("virtualhost.test.queues.queue.ptest.priority",
+ configXml.addProperty("virtualhost.test.queues.queue.ptest.priority",
"true");
-
+
// Set up queue with no priorities
- configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
+ configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
"ntest");
- configXml.addProperty("virtualhost.test.queues.queue.ntest(-1).exchange",
+ configXml.addProperty("virtualhost.test.queues.queue.ntest(-1).exchange",
"amq.direct");
- configXml.addProperty("virtualhost.test.queues.queue.ntest.priority",
+ configXml.addProperty("virtualhost.test.queues.queue.ntest.priority",
"false");
-
- VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
-
+
+ VirtualHost vhost = new VirtualHostImpl(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
+
// Check that atest was a priority queue with 5 priorities
AMQQueue atest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
assertTrue(atest instanceof AMQPriorityQueue);
assertEquals(5, ((AMQPriorityQueue) atest).getPriorities());
-
+
// Check that ptest was a priority queue with 10 priorities
AMQQueue ptest = vhost.getQueueRegistry().getQueue(new AMQShortString("ptest"));
assertTrue(ptest instanceof AMQPriorityQueue);
assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities());
-
+
// Check that ntest wasn't a priority queue
AMQQueue ntest = vhost.getQueueRegistry().getQueue(new AMQShortString("ntest"));
assertFalse(ntest instanceof AMQPriorityQueue);
}
-
+
public void testQueueAlerts() throws Exception
{
// Set up queue with 5 priorities
@@ -106,7 +105,7 @@
configXml.addProperty("virtualhost.test.queues.maximumQueueDepth", "1");
configXml.addProperty("virtualhost.test.queues.maximumMessageSize", "2");
configXml.addProperty("virtualhost.test.queues.maximumMessageAge", "3");
-
+
configXml.addProperty("virtualhost.test.queues(-1).queue(1).name(1)", "atest");
configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", "amq.direct");
configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumQueueDepth", "4");
@@ -114,21 +113,21 @@
configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumMessageAge", "6");
configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", "btest");
-
- VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
-
+
+ VirtualHost vhost = new VirtualHostImpl(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
+
// Check specifically configured values
AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
assertEquals(4, aTest.getMaximumQueueDepth());
assertEquals(5, aTest.getMaximumMessageSize());
assertEquals(6, aTest.getMaximumMessageAge());
-
- // Check default values
+
+ // Check default values
AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest"));
assertEquals(1, bTest.getMaximumQueueDepth());
assertEquals(2, bTest.getMaximumMessageSize());
assertEquals(3, bTest.getMaximumMessageAge());
-
+
}
-
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Tue Oct 20 16:23:01 2009
@@ -27,15 +27,18 @@
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.log4j.Logger;
import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
public class AbstractHeadersExchangeTestBase extends TestCase
{
@@ -49,8 +52,6 @@
*/
private MessageStore _store = new MemoryMessageStore();
- private MessageHandleFactory _handleFactory = new MessageHandleFactory();
-
private int count;
public void testDoNothing()
@@ -88,8 +89,8 @@
protected int route(Message m) throws AMQException
{
+ m.getIncomingMessage().headersReceived();
m.route(exchange);
- m.getIncomingMessage().routingComplete(_store, _handleFactory);
if(m.getIncomingMessage().allContentReceived())
{
for(AMQQueue q : m.getIncomingMessage().getDestinationQueues())
@@ -343,7 +344,7 @@
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void setRedelivered(boolean b)
+ public void setRedelivered()
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -452,6 +453,8 @@
*/
static class Message extends AMQMessage
{
+ private static AtomicLong _messageId = new AtomicLong();
+
private class TestIncomingMessage extends IncomingMessage
{
@@ -459,7 +462,7 @@
final MessagePublishInfo info,
final AMQProtocolSession publisher)
{
- super(messageId, info, publisher);
+ super(info);
}
@@ -484,7 +487,6 @@
private IncomingMessage _incoming;
- private static MessageStore _messageStore = new SkeletonMessageStore();
Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException
{
@@ -493,7 +495,7 @@
Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException
{
- this(protocolSession, _messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
+ this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST);
}
public IncomingMessage getIncomingMessage()
@@ -506,32 +508,27 @@
ContentHeaderBody header,
List<ContentBody> bodies) throws AMQException
{
- super(createMessageHandle(messageId, publish, header), header, header.bodySize, publish);
+ super(new MockStoredMessage(messageId, publish, header));
+
+ StoredMessage<MessageMetaData> storedMessage = getStoredMessage();
+ int pos = 0;
+ for(ContentBody body : bodies)
+ {
+ storedMessage.addContent(pos, body.payload.duplicate().buf());
+ pos += body.payload.limit();
+ }
-
_incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession);
_incoming.setContentHeaderBody(header);
}
- private static AMQMessageHandle createMessageHandle(final long messageId,
- final MessagePublishInfo publish,
- final ContentHeaderBody header)
- {
-
- final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
- _messageStore,
- true);
-
- amqMessageHandle.setPublishAndContentHeaderBody(publish,header);
- return amqMessageHandle;
- }
private Message(AMQMessage msg) throws AMQException
{
- super(msg);
+ super(msg.getStoredMessage());
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java Tue Oct 20 16:23:01 2009
@@ -29,6 +29,7 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Tue Oct 20 16:23:01 2009
@@ -53,6 +53,16 @@
return null;
}
+ public String getMimeType()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public String getEncoding()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public byte getPriority()
{
return 0;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,7 +35,8 @@
super.setUp();
// AR will use the NullAR by default
// Just use the first vhost.
- VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
+ VirtualHost
+ virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
_protocolSession = new InternalTestProtocolSession(virtualHost);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Tue Oct 20 16:23:01 2009
@@ -14,9 +14,9 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.exchange;
@@ -28,9 +28,12 @@
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
public class TopicExchangeTest extends TestCase
@@ -54,19 +57,19 @@
public void tearDown()
{
- ApplicationRegistry.remove();
+ ApplicationRegistry.remove();
}
public void testNoRoute() throws AMQException
{
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("a*#b"), false, null, false, _vhost, null);
_exchange.registerQueue(new AMQShortString("a.*.#.b"), queue, null);
MessagePublishInfo info = new PublishInfo(new AMQShortString("a.b"));
- IncomingMessage message = new IncomingMessage(0L, info, _protocolSession);
+ IncomingMessage message = new IncomingMessage(info);
message.enqueue(_exchange.route(message));
@@ -349,9 +352,11 @@
private int routeMessage(final IncomingMessage message)
throws AMQException
{
+ MessageMetaData mmd = message.headersReceived();
+ message.setStoredMessage(_store.addMessage(mmd));
+
message.enqueue(_exchange.route(message));
- message.routingComplete(_store, new MessageHandleFactory());
- AMQMessage msg = new AMQMessage(message.getMessageHandle(), message.getContentHeader(), message.getSize(), message.getMessagePublishInfo());
+ AMQMessage msg = new AMQMessage(message.getStoredMessage());
for(AMQQueue q : message.getDestinationQueues())
{
q.enqueue(msg);
@@ -393,8 +398,11 @@
{
MessagePublishInfo info = new PublishInfo(new AMQShortString(s));
- IncomingMessage message = new IncomingMessage(0L, info, _protocolSession);
- message.setContentHeaderBody( new ContentHeaderBody());
+ IncomingMessage message = new IncomingMessage(info);
+ final ContentHeaderBody chb = new ContentHeaderBody();
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ chb.properties = props;
+ message.setContentHeaderBody(chb);
return message;
@@ -417,7 +425,7 @@
public void setExchange(AMQShortString exchange)
{
-
+
}
public boolean isImmediate()
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java Tue Oct 20 16:23:01 2009
@@ -119,7 +119,7 @@
// Verify that the message has the correct type
assertTrue("Message contains the [con: prefix",
logs.get(0).toString().contains("[con:"));
-
+
// Verify that all the values were presented to the MessageFormatter
// so we will not end up with '{n}' entries in the log.
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java Tue Oct 20 16:23:01 2009
@@ -73,7 +73,7 @@
// Correctly Close the AR we created
ApplicationRegistry.remove();
- super.tearDown();
+ super.tearDown();
}
private void setUpWithConfig(ServerConfiguration serverConfig) throws AMQException
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java Tue Oct 20 16:23:01 2009
@@ -59,7 +59,7 @@
validateLogMessage(log, "MST-1003", expected);
}
- public void testMessage1004()
+ /* public void testMessage1004()
{
_logMessage = MessageStoreMessages.MST_1004(null,false);
List<Object> log = performLog();
@@ -91,7 +91,7 @@
// Here we use MessageFormat to ensure the messasgeCount of 2000 is
// reformated for display as '2,000'
- String[] expected = {"Recovered ",
+ String[] expected = {"Recovered ",
MessageFormat.format("{0,number}", messasgeCount),
"messages for queue", queueName};
@@ -119,5 +119,5 @@
validateLogMessage(log, "MST-1006", expected);
}
-
+ */
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Tue Oct 20 16:23:01 2009
@@ -23,19 +23,16 @@
import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import javax.management.JMException;
-import java.security.Principal;
/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class AMQProtocolSessionMBeanTest extends TestCase
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Tue Oct 20 16:23:01 2009
@@ -27,13 +27,13 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.transport.TestNetworkDriver;
public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
@@ -68,7 +68,17 @@
public byte getProtocolMajorVersion()
{
return (byte) 8;
- }
+ }
+
+ public void writeReturn(MessagePublishInfo messagePublishInfo,
+ ContentHeaderBody header,
+ MessageContentSource msgContent,
+ int channelId,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
public byte getProtocolMinorVersion()
{
@@ -82,12 +92,12 @@
synchronized (_channelDelivers)
{
List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
-
+
if (all == null)
{
return new ArrayList<DeliveryPair>(0);
}
-
+
List<DeliveryPair> msgs = all.subList(0, count);
List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
@@ -99,15 +109,6 @@
}
}
- public void writeReturn(MessagePublishInfo messagePublishInfo,
- ContentHeaderBody header,
- Iterator<AMQDataBlock> bodyFrameIterator,
- int channelId,
- int replyCode,
- AMQShortString replyText) throws AMQException
- {
-
- }
// *** ProtocolOutputConverter Implementation
public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java Tue Oct 20 16:23:01 2009
@@ -22,17 +22,10 @@
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-
-import java.security.Principal;
/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class MaxChannelsTest extends TestCase
@@ -66,14 +59,14 @@
}
assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size()));
}
-
+
@Override
public void setUp()
{
//Highlight that this test will cause a new AR to be created
ApplicationRegistry.getInstance();
}
-
+
@Override
public void tearDown() throws Exception
{
@@ -87,7 +80,7 @@
{
// Correctly Close the AR we created
ApplicationRegistry.remove();
- }
+ }
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Tue Oct 20 16:23:01 2009
@@ -1,6 +1,6 @@
package org.apache.qpid.server.queue;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,21 +8,22 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import java.util.ArrayList;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import junit.framework.AssertionFailedError;
@@ -45,21 +46,21 @@
_queue.enqueue(createMessage(1L, (byte) 10));
_queue.enqueue(createMessage(2L, (byte) 4));
_queue.enqueue(createMessage(3L, (byte) 0));
-
+
// Enqueue messages in reverse order
_queue.enqueue(createMessage(4L, (byte) 0));
_queue.enqueue(createMessage(5L, (byte) 4));
_queue.enqueue(createMessage(6L, (byte) 10));
-
+
// Enqueue messages out of order
_queue.enqueue(createMessage(7L, (byte) 4));
_queue.enqueue(createMessage(8L, (byte) 10));
_queue.enqueue(createMessage(9L, (byte) 0));
-
+
// Register subscriber
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
-
+
ArrayList<QueueEntry> msgs = _subscription.getMessages();
try
{
@@ -98,10 +99,10 @@
msg.getContentHeaderBody().properties = props;
return msg;
}
-
+
protected AMQMessage createMessage(Long id) throws AMQException
{
return createMessage(id, (byte) 0);
}
-
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Tue Oct 20 16:23:01 2009
@@ -25,9 +25,12 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
@@ -44,7 +47,7 @@
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
-{
+{
private final static long MAX_MESSAGE_COUNT = 50;
private final static long MAX_MESSAGE_AGE = 250; // 0.25 sec
private final static long MAX_MESSAGE_SIZE = 2000; // 2 KB
@@ -232,7 +235,7 @@
_queue.registerSubscription(
subscription2, false);
-
+
while (_queue.getUndeliveredMessageCount()!= 0)
{
Thread.sleep(100);
@@ -251,7 +254,7 @@
_queueMBean.clearQueue();
assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
}
-
+
protected IncomingMessage message(final boolean immediate, long size) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
@@ -284,8 +287,10 @@
};
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ contentHeaderBody.properties = props;
contentHeaderBody.bodySize = size; // in bytes
- IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _protocolSession);
+ IncomingMessage message = new IncomingMessage(publish);
message.setContentHeaderBody(contentHeaderBody);
return message;
@@ -312,13 +317,16 @@
private void sendMessages(AMQChannel channel, long messageCount, final long size) throws AMQException
{
IncomingMessage[] messages = new IncomingMessage[(int) messageCount];
+ MessageMetaData[] metaData = new MessageMetaData[(int) messageCount];
for (int i = 0; i < messages.length; i++)
{
messages[i] = message(false, size);
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
+ metaData[i] = messages[i].headersReceived();
+ messages[i].setStoredMessage(_messageStore.addMessage(metaData[i]));
+
messages[i].enqueue(qs);
- messages[i].routingComplete(_messageStore, new MessageHandleFactory());
}
@@ -328,6 +336,10 @@
ByteBuffer _data = ByteBuffer.allocate((int)size);
+ {
+ _data.limit((int)size);
+ }
+
public int getSize()
{
return (int) size;
@@ -340,13 +352,11 @@
public void reduceToFit()
{
-
+
}
});
- _queue.enqueue(new AMQMessage(messages[i].getMessageHandle(),
- messages[i].getContentHeader(),
- messages[i].getSize(),
- messages[i].getMessagePublishInfo()));
+
+ _queue.enqueue(new AMQMessage(messages[i].getStoredMessage()));
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java Tue Oct 20 16:23:01 2009
@@ -25,7 +25,6 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
public class AMQQueueFactoryTest extends TestCase
{
@@ -55,31 +54,18 @@
FieldTable fieldTable = new FieldTable();
fieldTable.put(new AMQShortString(AMQQueueFactory.X_QPID_PRIORITIES), 5);
- try
- {
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false,
- _virtualHost, fieldTable);
-
- assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testPriorityQueue"), false, new AMQShortString("owner"), false,
+ _virtualHost, fieldTable);
+
+ assertEquals("Queue not a priorty queue", AMQPriorityQueue.class, queue.getClass());
}
public void testSimpleQueueRegistration()
{
- try
- {
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
- _virtualHost, null);
- assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue"), false, new AMQShortString("owner"), false,
+ _virtualHost, null);
+ assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Tue Oct 20 16:23:01 2009
@@ -29,6 +29,10 @@
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactory;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -41,6 +45,7 @@
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.mina.common.ByteBuffer;
+import org.apache.commons.configuration.PropertiesConfiguration;
import javax.management.JMException;
@@ -151,13 +156,11 @@
private void verifyBrokerState()
{
- TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+ TestableMemoryMessageStore store = (TestableMemoryMessageStore)_virtualHost.getMessageStore();
// Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
- assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
- assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
- assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
- assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
+
+ assertEquals("Store should have no messages:" + store.getMessageCount(), 0, store.getMessageCount());
}
public void testConsumerCount() throws AMQException
@@ -266,16 +269,22 @@
}
IncomingMessage msg = message(false, false);
- long id = msg.getMessageNumber();
_queue.clearQueue();
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore, new MessageHandleFactory());
+ MessageMetaData mmd = msg.headersReceived();
+ msg.setStoredMessage(_messageStore.addMessage(mmd));
+ long id = msg.getMessageNumber();
+
msg.addContentBodyFrame(new ContentChunk()
{
ByteBuffer _data = ByteBuffer.allocate((int)MESSAGE_SIZE);
+ {
+ _data.limit((int)MESSAGE_SIZE);
+ }
+
public int getSize()
{
return (int) MESSAGE_SIZE;
@@ -292,7 +301,7 @@
}
});
- AMQMessage m = new AMQMessage(msg.getMessageHandle(), msg.getContentHeader(), msg.getSize(), msg.getMessagePublishInfo());
+ AMQMessage m = new AMQMessage(msg.getStoredMessage());
for(AMQQueue q : msg.getDestinationQueues())
{
q.enqueue(m);
@@ -345,7 +354,7 @@
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
contentHeaderBody.properties = new BasicContentHeaderProperties();
((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
- IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _protocolSession);
+ IncomingMessage msg = new IncomingMessage(publish);
msg.setContentHeaderBody(contentHeaderBody);
return msg;
@@ -355,7 +364,14 @@
protected void setUp() throws Exception
{
super.setUp();
- IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
+
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+ IApplicationRegistry applicationRegistry = new TestApplicationRegistry(new ServerConfiguration(configuration));
+ ApplicationRegistry.initialise(applicationRegistry );
+
+ configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
_messageStore = _virtualHost.getMessageStore();
@@ -381,7 +397,8 @@
currentMessage.enqueue(qs);
// route header
- currentMessage.routingComplete(_messageStore, new MessageHandleFactory());
+ MessageMetaData mmd = currentMessage.headersReceived();
+ currentMessage.setStoredMessage(_messageStore.addMessage(mmd));
// Add the body so we have somthing to test later
currentMessage.addContentBodyFrame(
@@ -391,7 +408,7 @@
new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
MESSAGE_SIZE)));
- AMQMessage m = new AMQMessage(currentMessage.getMessageHandle(), currentMessage.getContentHeader(), currentMessage.getSize(), currentMessage.getMessagePublishInfo());
+ AMQMessage m = new AMQMessage(currentMessage.getStoredMessage());
for(AMQQueue q : currentMessage.getDestinationQueues())
{
q.enqueue(m);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Tue Oct 20 16:23:01 2009
@@ -28,7 +28,9 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
@@ -93,7 +95,6 @@
private void publishMessages(int count, boolean persistent) throws AMQException
{
_queue.registerSubscription(_subscription,false);
- MessageHandleFactory factory = new MessageHandleFactory();
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -126,37 +127,37 @@
return new AMQShortString("rk");
}
};
- final IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publishBody, _protocolSession);
+ final IncomingMessage msg = new IncomingMessage(publishBody);
//IncomingMessage msg2 = null;
+ BasicContentHeaderProperties b = new BasicContentHeaderProperties();
+ ContentHeaderBody cb = new ContentHeaderBody();
+ cb.properties = b;
+
if (persistent)
{
- BasicContentHeaderProperties b = new BasicContentHeaderProperties();
//This is DeliveryMode.PERSISTENT
b.setDeliveryMode((byte) 2);
- ContentHeaderBody cb = new ContentHeaderBody();
- cb.properties = b;
- msg.setContentHeaderBody(cb);
- }
- else
- {
- msg.setContentHeaderBody(new ContentHeaderBody());
}
+
+ msg.setContentHeaderBody(cb);
+
// we increment the reference here since we are not delivering the messaging to any queues, which is where
// the reference is normally incremented. The test is easier to construct if we have direct access to the
// subscription
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore, factory);
+ MessageMetaData mmd = msg.headersReceived();
+ msg.setStoredMessage(_messageStore.addMessage(mmd));
if(msg.allContentReceived())
{
- Transaction txn = new AutoCommitTransaction(_messageStore);
- txn.enqueue(_queue, msg, new Transaction.Action() {
+ ServerTransaction txn = new AutoCommitTransaction(_messageStore);
+ txn.enqueue(_queue, msg, new ServerTransaction.Action() {
public void postCommit()
{
try
{
- _queue.enqueue(new AMQMessage(msg.getMessageHandle(), msg.getContentHeader(), msg.getSize(), msg.getMessagePublishInfo()));
+ _queue.enqueue(new AMQMessage(msg.getStoredMessage()));
}
catch (AMQException e)
{
@@ -213,8 +214,8 @@
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
- assertTrue(_messageStore.getContentBodyMap().size() == 0);
+ assertTrue(_messageStore.getMessageCount() == 0);
+
}
@@ -230,8 +231,8 @@
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
- assertTrue(_messageStore.getContentBodyMap().size() == 0);
+ assertTrue(_messageStore.getMessageCount() == 0);
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java Tue Oct 20 16:23:01 2009
@@ -21,22 +21,17 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.AMQMessage;
public class MockAMQMessage extends AMQMessage
{
public MockAMQMessage(long messageId)
throws AMQException
{
- super(new MockAMQMessageHandle(messageId) ,
- (MessagePublishInfo)new MockMessagePublishInfo());
+ super(new MockStoredMessage(messageId));
}
- protected MockAMQMessage(AMQMessage msg)
- throws AMQException
- {
- super(msg);
- }
+
@Override
@@ -44,4 +39,5 @@
{
return 0l;
}
+
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Tue Oct 20 16:23:01 2009
@@ -25,12 +25,12 @@
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.AMQException;
import java.util.List;
@@ -57,6 +57,11 @@
return _name;
}
+ public void setNoLocal(boolean b)
+ {
+
+ }
+
public boolean isDurable()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -216,18 +221,18 @@
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
-
+
public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
{
return null;
}
- public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext)
+ public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext)
+ public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
{
//To change body of implemented methods use File | Settings | File Templates.
}
@@ -352,7 +357,7 @@
}
public void checkCapacity(AMQChannel channel)
- {
+ {
}
public ManagedObject getManagedObject()
@@ -367,7 +372,7 @@
public void setMinimumAlertRepeatGap(long value)
{
-
+
}
public long getCapacity()
@@ -392,7 +397,7 @@
public void configure(QueueConfiguration config)
{
-
+
}
public PrincipalHolder getPrincipalHolder()
@@ -416,4 +421,8 @@
}
+ public String getResourceName()
+ {
+ return _name.toString();
+ }
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Tue Oct 20 16:23:01 2009
@@ -23,6 +23,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
public class MockQueueEntry implements QueueEntry
{
@@ -71,7 +72,7 @@
public void routeToAlternate()
{
-
+
}
public void dispose()
@@ -124,35 +125,35 @@
return false;
}
-
+
public boolean isQueueDeleted()
{
return false;
}
-
+
public boolean isRejectedBy(Subscription subscription)
{
return false;
}
-
+
public void reject()
{
}
-
+
public void reject(Subscription subscription)
{
}
-
+
public void release()
{
@@ -171,7 +172,7 @@
return false;
}
-
+
public void requeue()
{
@@ -190,8 +191,8 @@
}
-
- public void setRedelivered(boolean b)
+
+ public void setRedelivered()
{
@@ -209,7 +210,7 @@
public boolean isRedelivered()
{
- return false;
+ return false;
}
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,92 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+
+import java.nio.ByteBuffer;
+
+public class MockStoredMessage implements StoredMessage<MessageMetaData>
+{
+ private long _messageId;
+ private MessageMetaData _metaData;
+ private final ByteBuffer _content;
+
+
+ public MockStoredMessage(long messageId)
+ {
+ this(messageId, new MockMessagePublishInfo(), new ContentHeaderBody(new BasicContentHeaderProperties(), 60));
+ }
+
+ public MockStoredMessage(long messageId, MessagePublishInfo info, ContentHeaderBody chb)
+ {
+ _messageId = messageId;
+ _metaData = new MessageMetaData(info, chb, 0);
+ _content = ByteBuffer.allocate(_metaData.getContentSize());
+
+ }
+
+ public MessageMetaData getMetaData()
+ {
+ return _metaData;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ src = src.duplicate();
+ ByteBuffer dst = _content.duplicate();
+ dst.position(offsetInMessage);
+ dst.put(src);
+ }
+
+ public int getContent(int offset, ByteBuffer dst)
+ {
+ ByteBuffer src = _content.duplicate();
+ src.position(offset);
+ src = src.slice();
+ if(dst.remaining() < src.limit())
+ {
+ src.limit(dst.remaining());
+ }
+ dst.put(src);
+ return src.limit();
+ }
+
+ public TransactionLog.StoreFuture flushToStore()
+ {
+ return MessageStore.IMMEDIATE_FUTURE;
+ }
+
+ public void remove()
+ {
+ }
+}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
------------------------------------------------------------------------------
svn:executable = *
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org