You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/07/20 21:05:08 UTC
svn commit: r795958 [3/3] - in /qpid/branches/java-broker-0-10/qpid/java:
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/src/main/java/org/apache/q...
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Mon Jul 20 19:05:05 2009
@@ -25,6 +25,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -93,10 +94,11 @@
public void process() throws AMQException
{
- _message.incrementReference();
+ MessageReference ref = _message.newReference();
try
{
- QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
+ StoreContext.setCurrentContext(getStoreContext());
+ QueueEntry entry = _queue.enqueue(_message);
if(entry.immediateAndNotDelivered())
{
@@ -105,7 +107,8 @@
}
finally
{
- _message.decrementReference(getStoreContext());
+ ref.release();
+ StoreContext.clearCurrentContext();
}
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Mon Jul 20 19:05:05 2009
@@ -90,8 +90,13 @@
public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
{
- QueueEntry entry = queue.enqueue(_storeContext, message);
-
+ StoreContext.setCurrentContext(getStoreContext());
+
+ QueueEntry entry = queue.enqueue(message);
+
+ StoreContext.clearCurrentContext();
+
+
//following check implements the functionality
//required by the 'immediate' flag:
if(entry.immediateAndNotDelivered())
@@ -128,7 +133,7 @@
{
if (debug)
{
- _log.debug("Discarding message: " + message.getMessage().getMessageId());
+ _log.debug("Discarding message: " + message.getMessage().getMessageNumber());
}
if(message.getMessage().isPersistent())
{
@@ -171,7 +176,7 @@
if (debug)
{
- _log.debug("Discarding message: " + msg.getMessage().getMessageId());
+ _log.debug("Discarding message: " + msg.getMessage().getMessageNumber());
}
if(msg.getMessage().isPersistent())
{
@@ -187,7 +192,7 @@
if (debug)
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
- msg.getMessage().getMessageId());
+ msg.getMessage().getMessageNumber());
}
}
if(_inTran)
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java Mon Jul 20 19:05:05 2009
@@ -26,6 +26,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.QueueEntryImpl;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.tools.utils.Console;
@@ -100,7 +101,7 @@
for (QueueEntry entry : messages)
{
- AMQMessage msg = entry.getMessage();
+ ServerMessage msg = entry.getMessage();
if (!includeMsg(msg, msgids))
{
continue;
@@ -112,7 +113,7 @@
// Show general message information
hex.add(Show.Columns.ID.name());
- ascii.add(msg.getMessageId().toString());
+ ascii.add(msg.getMessageNumber().toString());
hex.add(Console.ROW_DIVIDER);
ascii.add(Console.ROW_DIVIDER);
@@ -136,110 +137,114 @@
hex.add(Console.ROW_DIVIDER);
ascii.add(Console.ROW_DIVIDER);
- Iterator bodies = msg.getContentBodyIterator();
- if (bodies.hasNext())
+ if(msg instanceof AMQMessage)
{
- hex.add("Hex");
- hex.add(Console.ROW_DIVIDER);
+ Iterator bodies = ((AMQMessage)msg).getContentBodyIterator();
+ if (bodies.hasNext())
+ {
+ hex.add("Hex");
+ hex.add(Console.ROW_DIVIDER);
- ascii.add("ASCII");
- ascii.add(Console.ROW_DIVIDER);
- while (bodies.hasNext())
- {
- ContentChunk chunk = (ContentChunk) bodies.next();
+ ascii.add("ASCII");
+ ascii.add(Console.ROW_DIVIDER);
- //Duplicate so we don't destroy original data :)
- ByteBuffer hexBuffer = chunk.getData().duplicate();
+ while (bodies.hasNext())
+ {
+ ContentChunk chunk = (ContentChunk) bodies.next();
- ByteBuffer charBuffer = hexBuffer.duplicate();
+ //Duplicate so we don't destroy original data :)
+ ByteBuffer hexBuffer = chunk.getData().duplicate();
- Hex hexencoder = new Hex();
+ ByteBuffer charBuffer = hexBuffer.duplicate();
- while (hexBuffer.hasRemaining())
- {
- byte[] line = new byte[LINE_SIZE];
+ Hex hexencoder = new Hex();
- int bufsize = hexBuffer.remaining();
- if (bufsize < LINE_SIZE)
- {
- hexBuffer.get(line, 0, bufsize);
- }
- else
+ while (hexBuffer.hasRemaining())
{
- bufsize = line.length;
- hexBuffer.get(line);
- }
+ byte[] line = new byte[LINE_SIZE];
- byte[] encoded = hexencoder.encode(line);
+ int bufsize = hexBuffer.remaining();
+ if (bufsize < LINE_SIZE)
+ {
+ hexBuffer.get(line, 0, bufsize);
+ }
+ else
+ {
+ bufsize = line.length;
+ hexBuffer.get(line);
+ }
- try
- {
- String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING);
- String hexLine = "";
+ byte[] encoded = hexencoder.encode(line);
- int strKength = encStr.length();
- for (int c = 0; c < strKength; c++)
+ try
{
- hexLine += encStr.charAt(c);
+ String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING);
+ String hexLine = "";
- if (c % 2 == 1 && SPACE_BYTES)
+ int strKength = encStr.length();
+ for (int c = 0; c < strKength; c++)
{
- hexLine += BYTE_SPACER;
+ hexLine += encStr.charAt(c);
+
+ if (c % 2 == 1 && SPACE_BYTES)
+ {
+ hexLine += BYTE_SPACER;
+ }
}
- }
- hex.add(hexLine);
- }
- catch (UnsupportedEncodingException e)
- {
- _console.println(e.getMessage());
- return null;
+ hex.add(hexLine);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ _console.println(e.getMessage());
+ return null;
+ }
}
- }
- while (charBuffer.hasRemaining())
- {
- String asciiLine = "";
-
- for (int pos = 0; pos < LINE_SIZE; pos++)
+ while (charBuffer.hasRemaining())
{
- if (charBuffer.hasRemaining())
- {
- byte ch = charBuffer.get();
+ String asciiLine = "";
- if (isPrintable(ch))
+ for (int pos = 0; pos < LINE_SIZE; pos++)
+ {
+ if (charBuffer.hasRemaining())
{
- asciiLine += (char) ch;
+ byte ch = charBuffer.get();
+
+ if (isPrintable(ch))
+ {
+ asciiLine += (char) ch;
+ }
+ else
+ {
+ asciiLine += NON_PRINTING_ASCII_CHAR;
+ }
+
+ if (SPACE_BYTES)
+ {
+ asciiLine += BYTE_SPACER;
+ }
}
else
{
- asciiLine += NON_PRINTING_ASCII_CHAR;
- }
-
- if (SPACE_BYTES)
- {
- asciiLine += BYTE_SPACER;
+ break;
}
}
- else
- {
- break;
- }
- }
- ascii.add(asciiLine);
+ ascii.add(asciiLine);
+ }
}
}
- }
- else
- {
- List<String> result = new LinkedList<String>();
+ else
+ {
+ List<String> result = new LinkedList<String>();
- display.add(result);
- result.add("No ContentBodies");
+ display.add(result);
+ result.add("No ContentBodies");
+ }
}
}
@@ -252,7 +257,7 @@
return display;
}
- private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg,
+ private void addShowInformation(List<String> column1, List<String> column2, ServerMessage msg,
String title, boolean routing, boolean headers, boolean messageHeaders)
{
List<QueueEntry> single = new LinkedList<QueueEntry>();
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java Mon Jul 20 19:05:05 2009
@@ -172,7 +172,7 @@
{
for (QueueEntry msg : messages)
{
- ids.add(msg.getMessage().getMessageId());
+ ids.add(msg.getMessage().getMessageNumber());
}
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Mon Jul 20 19:05:05 2009
@@ -26,9 +26,9 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntryImpl;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.tools.utils.Console;
@@ -337,30 +337,24 @@
//Add create the table of data
for (QueueEntry entry : messages)
{
- AMQMessage msg = entry.getMessage();
+ ServerMessage msg = entry.getMessage();
if (!includeMsg(msg, msgids))
{
continue;
}
- id.add(msg.getMessageId().toString());
+ id.add(msg.getMessageNumber().toString());
size.add("" + msg.getSize());
arrival.add("" + msg.getArrivalTime());
- try
- {
- ispersitent.add(msg.isPersistent() ? "true" : "false");
- }
- catch (AMQException e)
- {
- ispersitent.add("n/a");
- }
+ ispersitent.add(msg.isPersistent() ? "true" : "false");
+
isredelivered.add(msg.isRedelivered() ? "true" : "false");
- isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false");
+ isdelivered.add(entry.getDeliveredToConsumer() ? "true" : "false");
// msg.getMessageHandle();
@@ -368,7 +362,10 @@
try
{
- headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
+ if(msg instanceof AMQMessage)
+ {
+ headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().properties);
+ }
}
catch (AMQException e)
{
@@ -417,7 +414,11 @@
MessagePublishInfo info = null;
try
{
- info = msg.getMessagePublishInfo();
+ if(msg instanceof AMQMessage)
+ {
+ info = ((AMQMessage)msg).getMessagePublishInfo();
+ }
+
}
catch (AMQException e)
{
@@ -457,14 +458,14 @@
return data;
}
- protected boolean includeMsg(AMQMessage msg, List<Long> msgids)
+ protected boolean includeMsg(ServerMessage msg, List<Long> msgids)
{
if (msgids == null)
{
return true;
}
- Long msgid = msg.getMessageId();
+ Long msgid = msg.getMessageNumber();
boolean found = false;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java Mon Jul 20 19:05:05 2009
@@ -89,7 +89,7 @@
while(queueEntries.advance())
{
QueueEntry entry = queueEntries.getNode();
- _unacknowledgedMessageMap.add(entry.getMessage().getMessageId(), entry);
+ _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
// Store the entry for future inspection
_referenceList.add(entry);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java Mon Jul 20 19:05:05 2009
@@ -38,7 +38,6 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -163,7 +162,9 @@
};
TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
- _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message));
+ StoreContext sc = StoreContext.setCurrentContext(new StoreContext());
+ _map.add(deliveryTag, _queue.enqueue(message));
+ StoreContext.setCurrentContext(sc);
}
_acked = acked;
_unacked = unacked;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Mon Jul 20 19:05:05 2009
@@ -33,6 +33,8 @@
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.log4j.Logger;
@@ -255,9 +257,9 @@
* @throws AMQException
*/
@Override
- public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException
+ public QueueEntry enqueue(ServerMessage msg) throws AMQException
{
- messages.add( new HeadersExchangeTest.Message(msg));
+ messages.add( new HeadersExchangeTest.Message((AMQMessage) msg));
return new QueueEntry()
{
@@ -326,11 +328,6 @@
//To change body of implemented methods use File | Settings | File Templates.
}
- public String debugIdentity()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
public boolean immediateAndNotDelivered()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -438,7 +435,7 @@
}
- public ContentHeaderBody getContentHeaderBody()
+ public ContentHeaderBody getContentHeader()
{
try
{
@@ -522,7 +519,7 @@
void route(Exchange exchange) throws AMQException
{
- exchange.route(_incoming);
+ _incoming.enqueue(exchange.route(_incoming));
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java Mon Jul 20 19:05:05 2009
@@ -76,7 +76,7 @@
IncomingMessage message = new IncomingMessage(0L, info, null, _protocolSession);
- _exchange.route(message);
+ message.enqueue(_exchange.route(message));
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -100,7 +100,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -140,7 +140,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -159,7 +159,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -198,7 +198,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -217,7 +217,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -236,7 +236,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -254,7 +254,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -294,7 +294,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -312,7 +312,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -352,7 +352,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -384,7 +384,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -425,7 +425,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -464,7 +464,7 @@
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -495,7 +495,7 @@
private void routeMessage(final IncomingMessage message)
throws AMQException
{
- _exchange.route(message);
+ message.enqueue(_exchange.route(message));
message.routingComplete(_store, new MessageHandleFactory());
message.deliverToQueues();
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Mon Jul 20 19:05:05 2009
@@ -22,16 +22,85 @@
import java.util.Map;
import java.util.HashMap;
+import java.util.Set;
import junit.framework.TestCase;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.message.AMQMessageHeader;
/**
*/
public class HeadersBindingTest extends TestCase
{
+
+ private class MockHeader implements AMQMessageHeader
+ {
+
+ private final Map<String, Object> _headers = new HashMap<String, Object>();
+
+ public String getCorrelationId()
+ {
+ return null;
+ }
+
+ public long getExpiration()
+ {
+ return 0;
+ }
+
+ public String getMessageId()
+ {
+ return null;
+ }
+
+ public byte getPriority()
+ {
+ return 0;
+ }
+
+ public long getTimestamp()
+ {
+ return 0;
+ }
+
+ public String getType()
+ {
+ return null;
+ }
+
+ public String getReplyTo()
+ {
+ return null;
+ }
+
+ public Object getHeader(String name)
+ {
+ return _headers.get(name);
+ }
+
+ public boolean containsHeaders(Set<String> names)
+ {
+ return _headers.keySet().containsAll(names);
+ }
+
+ public boolean containsHeader(String name)
+ {
+ return _headers.containsKey(name);
+ }
+
+ public void setString(String key, String value)
+ {
+ setObject(key,value);
+ }
+
+ public void setObject(String key, Object value)
+ {
+ _headers.put(key,value);
+ }
+ }
+
private FieldTable bindHeaders = new FieldTable();
- private FieldTable matchHeaders = new FieldTable();
+ private MockHeader matchHeaders = new MockHeader();
public void testDefault_1()
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Mon Jul 20 19:05:05 2009
@@ -42,19 +42,19 @@
{
// Enqueue messages in order
- _queue.enqueue(null, createMessage(1L, (byte) 10));
- _queue.enqueue(null, createMessage(2L, (byte) 4));
- _queue.enqueue(null, createMessage(3L, (byte) 0));
+ _queue.enqueue(createMessage(1L, (byte) 10));
+ _queue.enqueue(createMessage(2L, (byte) 4));
+ _queue.enqueue(createMessage(3L, (byte) 0));
// Enqueue messages in reverse order
- _queue.enqueue(null, createMessage(4L, (byte) 0));
- _queue.enqueue(null, createMessage(5L, (byte) 4));
- _queue.enqueue(null, createMessage(6L, (byte) 10));
+ _queue.enqueue(createMessage(4L, (byte) 0));
+ _queue.enqueue(createMessage(5L, (byte) 4));
+ _queue.enqueue(createMessage(6L, (byte) 10));
// Enqueue messages out of order
- _queue.enqueue(null, createMessage(7L, (byte) 4));
- _queue.enqueue(null, createMessage(8L, (byte) 10));
- _queue.enqueue(null, createMessage(9L, (byte) 0));
+ _queue.enqueue(createMessage(7L, (byte) 4));
+ _queue.enqueue(createMessage(8L, (byte) 10));
+ _queue.enqueue(createMessage(9L, (byte) 0));
// Register subscriber
_queue.registerSubscription(_subscription, false);
@@ -63,17 +63,17 @@
ArrayList<QueueEntry> msgs = _subscription.getMessages();
try
{
- assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId());
- assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId());
- assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId());
-
- assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId());
- assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId());
- assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId());
-
- assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId());
- assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId());
- assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId());
+ assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageNumber());
+ assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageNumber());
+ assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageNumber());
+
+ assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageNumber());
+ assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageNumber());
+ assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageNumber());
+
+ assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageNumber());
+ assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageNumber());
+ assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageNumber());
}
catch (AssertionFailedError afe)
{
@@ -81,7 +81,7 @@
int index = 1;
for (QueueEntry qe : msgs)
{
- System.err.println(index + ":" + qe.getMessage().getMessageId());
+ System.err.println(index + ":" + qe.getMessage().getMessageNumber());
index++;
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Mon Jul 20 19:05:05 2009
@@ -31,6 +31,7 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.AMQException;
import org.apache.commons.configuration.Configuration;
@@ -160,7 +161,7 @@
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
+ public QueueEntry enqueue(ServerMessage message) throws AMQException
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@@ -280,7 +281,7 @@
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- @Override
+
public void checkMessageStatus() throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -321,7 +322,7 @@
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- @Override
+
public void setMinimumAlertRepeatGap(long value)
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Mon Jul 20 19:05:05 2009
@@ -49,11 +49,6 @@
}
- public String debugIdentity()
- {
- return null;
- }
-
public boolean delete()
{
return false;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Mon Jul 20 19:05:05 2009
@@ -185,7 +185,7 @@
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
- _queue.enqueue(null, messageA);
+ _queue.enqueue(messageA);
assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
// Check removing the subscription removes it's information from the queue
@@ -196,7 +196,7 @@
1 == _queue.getActiveConsumerCount());
AMQMessage messageB = createMessage(new Long (25));
- _queue.enqueue(null, messageB);
+ _queue.enqueue(messageB);
QueueEntry entry = _subscription.getLastSeenEntry();
assertNull(entry);
}
@@ -204,7 +204,7 @@
public void testQueueNoSubscriber() throws AMQException, InterruptedException
{
AMQMessage messageA = createMessage(new Long(24));
- _queue.enqueue(null, messageA);
+ _queue.enqueue(messageA);
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
@@ -223,7 +223,7 @@
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
- _queue.enqueue(null, messageA);
+ _queue.enqueue(messageA);
assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
@@ -261,7 +261,7 @@
_queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost);
_queue.registerSubscription(_subscription, false);
AMQMessage message = createMessage(new Long(25));
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
_queue.unregisterSubscription(_subscription);
assertTrue("Queue was not deleted when subscription was removed",
_queue.isDeleted());
@@ -272,7 +272,7 @@
_queue.registerSubscription(_subscription, false);
Long id = new Long(26);
AMQMessage message = createMessage(id);
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
QueueEntry entry = _subscription.getLastSeenEntry();
entry.setRedelivered(true);
_queue.resend(entry, _subscription);
@@ -286,7 +286,7 @@
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
// Get message id
Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
@@ -302,7 +302,7 @@
Long messageId = new Long(i);
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5);
@@ -323,7 +323,7 @@
Long messageId = new Long(i);
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Mon Jul 20 19:05:05 2009
@@ -44,6 +44,7 @@
{
super.setUp();
_store = new TestMemoryMessageStore();
+ StoreContext.setCurrentContext(_storeContext);
}
/**
@@ -96,7 +97,7 @@
assertEquals(1, _store.getMessageMetaDataMap().size());
- message.decrementReference(_storeContext);
+ message.decrementReference();
assertEquals(1, _store.getMessageMetaDataMap().size());
}
@@ -158,7 +159,7 @@
assertEquals(1, _store.getMessageMetaDataMap().size());
message = message.takeReference();
- message.decrementReference(_storeContext);
+ message.decrementReference();
assertEquals(1, _store.getMessageMetaDataMap().size());
}
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Mon Jul 20 19:05:05 2009
@@ -904,10 +904,13 @@
}
}
+ public Object get(String key)
+ {
+ return get(new AMQShortString(key));
+ }
public Object get(AMQShortString key)
{
-
return getObject(key);
}
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Mon Jul 20 19:05:05 2009
@@ -55,7 +55,7 @@
private static final Logger log = Logger.get(Connection.class);
- enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
+ public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
class DefaultConnectionListener implements ConnectionListener
{
@@ -118,7 +118,7 @@
sender.setIdleTimeout(idleTimeout);
}
- void setState(State state)
+ protected void setState(State state)
{
synchronized (lock)
{
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Mon Jul 20 19:05:05 2009
@@ -52,13 +52,28 @@
{
private SaslServer saslServer;
+ private List<Object> _locales;
+ private List<Object> _mechanisms;
+ private Map<String, Object> _clientProperties;
+
+
+ public ServerDelegate()
+ {
+ this(null, Collections.EMPTY_LIST, Collections.singletonList((Object)"utf8"));
+ }
+
+ protected ServerDelegate(Map<String, Object> clientProperties, List<Object> mechanisms, List<Object> locales)
+ {
+ _clientProperties = clientProperties;
+ _mechanisms = mechanisms;
+ _locales = locales;
+ }
public void init(Connection conn, ProtocolHeader hdr)
{
conn.send(new ProtocolHeader(1, 0, 10));
- List<Object> utf8 = new ArrayList<Object>();
- utf8.add("utf8");
- conn.connectionStart(null, Collections.EMPTY_LIST, utf8);
+
+ conn.connectionStart(_clientProperties, _mechanisms, _locales);
}
@Override public void connectionStartOk(Connection conn, ConnectionStartOk ok)
@@ -77,8 +92,8 @@
try
{
- SaslServer ss = Sasl.createSaslServer
- (mechanism, "AMQP", "localhost", null, null);
+
+ SaslServer ss = createSaslServer(mechanism);
if (ss == null)
{
conn.connectionClose
@@ -95,6 +110,14 @@
}
}
+ protected SaslServer createSaslServer(String mechanism)
+ throws SaslException
+ {
+ SaslServer ss = Sasl.createSaslServer
+ (mechanism, "AMQP", "localhost", null, null);
+ return ss;
+ }
+
private void secure(Connection conn, byte[] response)
{
SaslServer ss = conn.getSaslServer();
@@ -133,9 +156,16 @@
@Override public void connectionOpen(Connection conn, ConnectionOpen open)
{
conn.connectionOpenOk(Collections.EMPTY_LIST);
+
conn.setState(OPEN);
}
+ protected Session getSession(Connection conn, SessionDelegate delegate, SessionAttach atc)
+ {
+ return new Session(conn, delegate, new Binary(atc.getName()), 0);
+ }
+
+
public Session getSession(Connection conn, SessionAttach atc)
{
return new Session(conn, new Binary(atc.getName()), 0);
Modified: qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=795958&r1=795957&r2=795958&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Mon Jul 20 19:05:05 2009
@@ -82,7 +82,7 @@
private Binary name;
private long expiry;
private int channel;
- private SessionDelegate delegate = new SessionDelegate();
+ private SessionDelegate delegate;
private SessionListener listener = new DefaultSessionListener();
private long timeout = 60000;
private boolean autoSync = false;
@@ -112,9 +112,15 @@
private Thread resumer = null;
- Session(Connection connection, Binary name, long expiry)
+ protected Session(Connection connection, Binary name, long expiry)
+ {
+ this(connection, new SessionDelegate(), name, expiry);
+ }
+
+ protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
this.connection = connection;
+ this.delegate = delegate;
this.name = name;
this.expiry = expiry;
initReceiver();
Added: qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java?rev=795958&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java Mon Jul 20 19:05:05 2009
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.*;
+
+public class SimpleConnectionTest extends TestCase
+{
+ public void testConnection()
+ {
+ try
+ {
+ AMQConnection conn = new AMQConnection("127.0.0.1", 5673, "guest", "guest", "test", "/test");
+ QueueSession s = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueSender p = s.createSender(new AMQQueue("amq.direct", "queue"));
+ p.send(s.createTextMessage("test"));
+
+ QueueReceiver r = s.createReceiver(new AMQQueue("amq.direct", "queue"));
+ conn.start();
+ Message m = r.receive();
+
+ Thread.sleep(60000L);
+ conn.close();
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (URLSyntaxException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org